aboutsummaryrefslogtreecommitdiff
path: root/nxcomp/Proxy.cpp
diff options
context:
space:
mode:
authorMike Gabriel <mike.gabriel@das-netzwerkteam.de>2017-06-30 20:13:51 +0200
committerMike Gabriel <mike.gabriel@das-netzwerkteam.de>2017-07-26 10:12:43 +0200
commitf76c82403888bb498973ec974dbfd20e4edb02fe (patch)
treebe0cb6c112d9d9fb46387fbd114727510197ddec /nxcomp/Proxy.cpp
parent9193d11eeeea933e293acd5e0f03fa4e9887186b (diff)
downloadnx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.tar.gz
nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.tar.bz2
nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.zip
nxcomp: Switch to autoreconf.
Diffstat (limited to 'nxcomp/Proxy.cpp')
-rw-r--r--nxcomp/Proxy.cpp6525
1 files changed, 0 insertions, 6525 deletions
diff --git a/nxcomp/Proxy.cpp b/nxcomp/Proxy.cpp
deleted file mode 100644
index 3a7a42362..000000000
--- a/nxcomp/Proxy.cpp
+++ /dev/null
@@ -1,6525 +0,0 @@
-/**************************************************************************/
-/* */
-/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */
-/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */
-/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */
-/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */
-/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
-/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */
-/* */
-/* NXCOMP, NX protocol compression and NX extensions to this software */
-/* are copyright of the aforementioned persons and companies. */
-/* */
-/* Redistribution and use of the present software is allowed according */
-/* to terms specified in the file LICENSE.nxcomp which comes in the */
-/* source distribution. */
-/* */
-/* All rights reserved. */
-/* */
-/* NOTE: This software has received contributions from various other */
-/* contributors, only the core maintainers and supporters are listed as */
-/* copyright holders. Please contact us, if you feel you should be listed */
-/* as copyright holder, as well. */
-/* */
-/**************************************************************************/
-
-#include <cstdio>
-#include <unistd.h>
-#include <cstdlib>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <signal.h>
-
-#ifdef ANDROID
-#include <netinet/in.h>
-#include <netinet/ip.h>
-#include <netinet/tcp.h>
-#endif
-
-#include "Misc.h"
-
-#if defined(__CYGWIN32__) || defined(__APPLE__) || defined(__FreeBSD__) || defined(__sun)
-#include <netinet/in_systm.h>
-#endif
-
-#ifndef ANDROID
-#include <netinet/in.h>
-#include <netinet/ip.h>
-#include <netinet/tcp.h>
-#endif
-
-#include "NXalert.h"
-#include "NXvars.h"
-
-#include "Proxy.h"
-
-#include "Socket.h"
-#include "Channel.h"
-#include "Statistics.h"
-
-#include "ClientChannel.h"
-#include "ServerChannel.h"
-#include "GenericChannel.h"
-#include "ChannelEndPoint.h"
-
-//
-// We need to adjust some values related
-// to these messages at the time the mes-
-// sage stores are reconfigured.
-//
-
-#include "PutImage.h"
-#include "ChangeGC.h"
-#include "PolyFillRectangle.h"
-#include "PutPackedImage.h"
-
-//
-// This is from the main loop.
-//
-
-extern void CleanupListeners();
-
-extern int HandleChild(int);
-
-//
-// Default size of string buffers.
-//
-
-#define DEFAULT_STRING_LENGTH 512
-
-//
-// Set the verbosity level. You also need
-// to define DUMP in Misc.cpp if DUMP is
-// defined here.
-//
-
-#define WARNING
-#define PANIC
-#undef TEST
-#undef DEBUG
-#undef DUMP
-
-//
-// Log the important tracepoints related
-// to writing packets to the peer proxy.
-//
-
-#undef FLUSH
-
-//
-// Log the operations related to splits.
-//
-
-#undef SPLIT
-
-//
-// Log the operations related to sending
-// and receiving the control tokens.
-//
-
-#undef TOKEN
-
-//
-// Log the operations related to setting
-// the token limits.
-//
-
-#undef LIMIT
-
-//
-// Log a warning if no data is written by
-// the proxy within a timeout.
-//
-
-#undef TIME
-
-//
-// Log the operation related to generating
-// the ping message at idle time.
-//
-
-#undef PING
-
-Proxy::Proxy(int fd)
-
- : transport_(new ProxyTransport(fd)), fd_(fd), readBuffer_(transport_)
-{
- for (int channelId = 0;
- channelId < CONNECTIONS_LIMIT;
- channelId++)
- {
- channels_[channelId] = NULL;
- transports_[channelId] = NULL;
- congestions_[channelId] = 0;
-
- fdMap_[channelId] = nothing;
- channelMap_[channelId] = nothing;
- slavePidMap_[channelId] = nothing;
- }
-
- inputChannel_ = nothing;
- outputChannel_ = nothing;
-
- controlLength_ = 0;
-
- operation_ = operation_in_negotiation;
-
- draining_ = 0;
- priority_ = 0;
- finish_ = 0;
- shutdown_ = 0;
- congestion_ = 0;
-
- timer_ = 0;
- alert_ = 0;
-
- agent_ = nothing;
-
- //
- // Set null timeouts. This will require
- // a new link configuration.
- //
-
- timeouts_.split = 0;
- timeouts_.motion = 0;
-
- timeouts_.readTs = getTimestamp();
- timeouts_.writeTs = getTimestamp();
-
- timeouts_.loopTs = getTimestamp();
- timeouts_.pingTs = getTimestamp();
- timeouts_.alertTs = nullTimestamp();
- timeouts_.loadTs = nullTimestamp();
-
- timeouts_.splitTs = nullTimestamp();
- timeouts_.motionTs = nullTimestamp();
-
- //
- // Initialize the token counters. This
- // will require a new link configuration.
- //
-
- for (int i = token_control; i <= token_data; i++)
- {
- tokens_[i].size = 0;
- tokens_[i].limit = 0;
-
- tokens_[i].bytes = 0;
- tokens_[i].remaining = 0;
- }
-
- tokens_[token_control].request = code_control_token_request;
- tokens_[token_control].reply = code_control_token_reply;
- tokens_[token_control].type = token_control;
-
- tokens_[token_split].request = code_split_token_request;
- tokens_[token_split].reply = code_split_token_reply;
- tokens_[token_split].type = token_split;
-
- tokens_[token_data].request = code_data_token_request;
- tokens_[token_data].reply = code_data_token_reply;
- tokens_[token_data].type = token_data;
-
- currentStatistics_ = NULL;
-
- //
- // Create compressor and decompressor
- // for image and data payload.
- //
-
- compressor_ = new StaticCompressor(control -> LocalDataCompressionLevel,
- control -> LocalDataCompressionThreshold);
-
- //
- // Create object storing NX specific
- // opcodes.
- //
-
- opcodeStore_ = new OpcodeStore();
-
- //
- // Create the message stores.
- //
-
- clientStore_ = new ClientStore(compressor_);
- serverStore_ = new ServerStore(compressor_);
-
- clientCache_ = new ClientCache();
- serverCache_ = new ServerCache();
-
- if (clientCache_ == NULL || serverCache_ == NULL)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Failed to create the channel cache.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Failed to create the channel cache.\n";
-
- HandleCleanup();
- }
-
- //
- // Prepare for image decompression.
- //
-
- UnpackInit();
-
- #ifdef DEBUG
- *logofs << "Proxy: Created new object at " << this
- << ".\n" << logofs_flush;
- #endif
-}
-
-Proxy::~Proxy()
-{
- for (int channelId = 0;
- channelId < CONNECTIONS_LIMIT;
- channelId++)
- {
- if (channels_[channelId] != NULL)
- {
- deallocateTransport(channelId);
-
- delete channels_[channelId];
- channels_[channelId] = NULL;
- }
- }
-
- //
- // Kill all active slave channel children, and
- // give them 5 seconds to exit nicely.
-
- #ifdef DEBUG
- *logofs << "Proxy: Killing active slaves" << endl;
- #endif
-
- int slave_count = 999;
- int loop_count = 0;
-
- while(slave_count > 0 && loop_count < 50)
- {
- slave_count = 0;
-
- for (int channelId = 0; channelId<CONNECTIONS_LIMIT; channelId++)
- {
- int pid = slavePidMap_[channelId];
-
- if (pid > 1) {
- slave_count++;
-
- #ifdef DEBUG
- *logofs << "Proxy: Active slave with pid " << pid << logofs_flush;
- #endif
-
- if ( loop_count == 0 )
- {
- #ifdef DEBUG
- *logofs << "Proxy: Sending SIGTERM to " << pid << logofs_flush;
- #endif
- kill(pid, SIGTERM);
- }
- else if ( loop_count == 25 )
- {
- #ifdef DEBUG
- *logofs << "Proxy: Sending SIGKILL to " << pid << logofs_flush;
- #endif
- kill(pid, SIGKILL);
- }
-
- if (HandleChild(pid))
- {
- #ifdef DEBUG
- *logofs << "Proxy: Slave " << pid << " terminated" << logofs_flush;
- #endif
- slavePidMap_[channelId] = nothing;
- }
- }
- }
-
- if ( slave_count > 0 )
- {
- cerr << "Proxy: Error: Failed to kill all slave channel processes. " << slave_count << " processes still remaining." << endl;
- }
-
- usleep(200000);
- loop_count++;
- }
-
- delete transport_;
- delete compressor_;
-
- //
- // Delete storage shared among channels.
- //
-
- delete opcodeStore_;
-
- delete clientStore_;
- delete serverStore_;
-
- delete clientCache_;
- delete serverCache_;
-
- //
- // Get rid of the image decompression
- // resources.
- //
-
- UnpackDestroy();
-
- #ifdef DEBUG
- *logofs << "Proxy: Deleted proxy object at " << this
- << ".\n" << logofs_flush;
- #endif
-}
-
-int Proxy::setOperational()
-{
- #ifdef TEST
- *logofs << "Proxy: Entering operational mode.\n"
- << logofs_flush;
- #endif
-
- operation_ = operation_in_messages;
-
- return 1;
-}
-
-int Proxy::setReadDescriptors(fd_set *fdSet, int &fdMax, T_timestamp &tsMax)
-{
- //
- // Set the initial timeout to the time of
- // the next ping. If the congestion count
- // is greater than zero, anyway, use a
- // shorter timeout to force a congestion
- // update.
- //
-
- if (agent_ != nothing && congestions_[agent_] == 0 &&
- statistics -> getCongestionInFrame() >= 1 &&
- tokens_[token_control].remaining >=
- (tokens_[token_control].limit - 1))
- {
- setMinTimestamp(tsMax, control -> IdleTimeout);
-
- #ifdef TEST
- *logofs << "Proxy: Initial timeout is " << tsMax.tv_sec
- << " S and " << (double) tsMax.tv_usec /
- 1000 << " Ms with congestion "
- << statistics -> getCongestionInFrame()
- << ".\n" << logofs_flush;
- #endif
- }
- else
- {
- setMinTimestamp(tsMax, control -> PingTimeout);
-
- #ifdef TEST
- *logofs << "Proxy: Initial timeout is " << tsMax.tv_sec
- << " S and " << (double) tsMax.tv_usec /
- 1000 << " Ms.\n" << logofs_flush;
- #endif
- }
-
- int fd = -1;
-
- if (isTimeToRead() == 1)
- {
- //
- // If we don't have split tokens available
- // don't set the timeout.
- //
-
- if (tokens_[token_split].remaining > 0 &&
- isTimestamp(timeouts_.splitTs) == 1)
- {
- int diffTs = getTimeToNextSplit();
-
- #if defined(TEST) || defined(INFO) || \
- defined(FLUSH) || defined(SPLIT)
-
- if (diffTimestamp(timeouts_.splitTs,
- getTimestamp()) > timeouts_.split)
- {
- *logofs << "Proxy: FLUSH! SPLIT! WARNING! Running with "
- << diffTimestamp(timeouts_.splitTs, getTimestamp())
- << " Ms elapsed since the last split.\n"
- << logofs_flush;
- }
-
- *logofs << "Proxy: FLUSH! SPLIT! Requesting timeout of "
- << diffTs << " Ms as there are splits to send.\n"
- << logofs_flush;
-
- #endif
-
- setMinTimestamp(tsMax, diffTs);
- }
- #if defined(TEST) || defined(INFO)
- else if (isTimestamp(timeouts_.splitTs) == 1)
- {
- *logofs << "Proxy: WARNING! Not requesting a split "
- << "timeout with " << tokens_[token_split].remaining
- << " split tokens remaining.\n" << logofs_flush;
- }
- #endif
-
- //
- // Loop through the valid channels and set
- // the descriptors selected for read and
- // the timeout.
- //
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] == NULL)
- {
- continue;
- }
-
- fd = getFd(channelId);
-
- if (channels_[channelId] -> getFinish() == 0 &&
- (channels_[channelId] -> getType() == channel_x11 ||
- tokens_[token_data].remaining > 0) &&
- congestions_[channelId] == 0)
- {
- FD_SET(fd, fdSet);
-
- if (fd >= fdMax)
- {
- fdMax = fd + 1;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Descriptor FD#" << fd
- << " selected for read with buffer length "
- << transports_[channelId] -> length()
- << ".\n" << logofs_flush;
- #endif
-
- //
- // Wakeup the proxy if there are motion
- // events to flush.
- //
-
- if (isTimestamp(timeouts_.motionTs) == 1)
- {
- int diffTs = getTimeToNextMotion();
-
- #if defined(TEST) || defined(INFO)
-
- if (diffTimestamp(timeouts_.motionTs,
- getTimestamp()) > timeouts_.motion)
- {
- *logofs << "Proxy: FLUSH! WARNING! Running with "
- << diffTimestamp(timeouts_.motionTs, getTimestamp())
- << " Ms elapsed since the last motion.\n"
- << logofs_flush;
- }
-
- *logofs << "Proxy: FLUSH! Requesting timeout of "
- << diffTs << " Ms as FD#" << fd << " has motion "
- << "events to send.\n" << logofs_flush;
-
- #endif
-
- setMinTimestamp(tsMax, diffTs);
- }
- }
- #if defined(TEST) || defined(INFO)
- else
- {
- if (channels_[channelId] -> getType() != channel_x11 &&
- tokens_[token_data].remaining <= 0)
- {
- *logofs << "Proxy: WARNING! Descriptor FD#" << fd
- << " not selected for read with "
- << tokens_[token_data].remaining << " data "
- << "tokens remaining.\n" << logofs_flush;
- }
- }
- #endif
- }
- }
- #if defined(TEST) || defined(INFO)
- else
- {
- *logofs << "Proxy: WARNING! Disabled reading from channels.\n"
- << logofs_flush;
-
- *logofs << "Proxy: WARNING! Congestion is " << congestion_
- << " pending " << transport_ -> pending() << " blocked "
- << transport_ -> blocked() << " length " << transport_ ->
- length() << ".\n" << logofs_flush;
- }
- #endif
-
- //
- // Include the proxy descriptor.
- //
-
- FD_SET(fd_, fdSet);
-
- if (fd_ >= fdMax)
- {
- fdMax = fd_ + 1;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Proxy descriptor FD#" << fd_
- << " selected for read with buffer length "
- << transport_ -> length() << ".\n"
- << logofs_flush;
- #endif
-
- return 1;
-}
-
-//
-// Add to the mask the file descriptors of all
-// X connections to write to.
-//
-
-int Proxy::setWriteDescriptors(fd_set *fdSet, int &fdMax, T_timestamp &tsMax)
-{
- int fd = -1;
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL)
- {
- fd = getFd(channelId);
-
- if (transports_[channelId] -> length() > 0)
- {
- FD_SET(fd, fdSet);
-
- #ifdef TEST
- *logofs << "Proxy: Descriptor FD#" << fd << " selected "
- << "for write with blocked " << transports_[channelId] ->
- blocked() << " and length " << transports_[channelId] ->
- length() << ".\n" << logofs_flush;
- #endif
-
- if (fd >= fdMax)
- {
- fdMax = fd + 1;
- }
- }
- #ifdef TEST
- else
- {
- *logofs << "Proxy: Descriptor FD#" << fd << " not selected "
- << "for write with blocked " << transports_[channelId] ->
- blocked() << " and length " << transports_[channelId] ->
- length() << ".\n" << logofs_flush;
- }
- #endif
-
- #if defined(TEST) || defined(INFO)
-
- if (transports_[channelId] -> getType() !=
- transport_agent && transports_[channelId] ->
- length() > 0 && transports_[channelId] ->
- blocked() != 1)
- {
- *logofs << "Proxy: PANIC! Descriptor FD#" << fd
- << " has data to write but blocked flag is "
- << transports_[channelId] -> blocked()
- << ".\n" << logofs_flush;
-
- cerr << "Error" << ": Descriptor FD#" << fd
- << " has data to write but blocked flag is "
- << transports_[channelId] -> blocked()
- << ".\n";
-
- HandleCleanup();
- }
-
- #endif
- }
- }
-
- //
- // Check if the proxy transport has data
- // from a previous blocking write.
- //
-
- if (transport_ -> blocked() == 1)
- {
- FD_SET(fd_, fdSet);
-
- #ifdef TEST
- *logofs << "Proxy: Proxy descriptor FD#"
- << fd_ << " selected for write. Blocked is "
- << transport_ -> blocked() << " length is "
- << transport_ -> length() << ".\n"
- << logofs_flush;
- #endif
-
- if (fd_ >= fdMax)
- {
- fdMax = fd_ + 1;
- }
- }
- #ifdef TEST
- else
- {
- *logofs << "Proxy: Proxy descriptor FD#"
- << fd_ << " not selected for write. Blocked is "
- << transport_ -> blocked() << " length is "
- << transport_ -> length() << ".\n"
- << logofs_flush;
- }
- #endif
-
- //
- // We are entering the main select. Save
- // the timestamp of the last loop so that
- // we can detect the clock drifts.
- //
-
- timeouts_.loopTs = getTimestamp();
-
- return 1;
-}
-
-int Proxy::getChannels(T_channel_type type)
-{
- int channels = 0;
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL &&
- (type == channel_none ||
- type == channels_[channelId] ->
- getType()))
- {
- channels++;
- }
- }
-
- return channels;
-}
-
-T_channel_type Proxy::getType(int fd)
-{
- int channelId = getChannel(fd);
-
- if (channelId < 0 || channels_[channelId] == NULL)
- {
- return channel_none;
- }
-
- return channels_[channelId] -> getType();
-}
-
-const char *Proxy::getTypeName(T_channel_type type)
-{
- switch (type)
- {
- case channel_x11:
- {
- return "X";
- }
- case channel_cups:
- {
- return "CUPS";
- }
- case channel_smb:
- {
- return "SMB";
- }
- case channel_media:
- {
- return "media";
- }
- case channel_http:
- {
- return "HTTP";
- }
- case channel_font:
- {
- return "font";
- }
- case channel_slave:
- {
- return "slave";
- }
- default:
- {
- return "unknown";
- }
- }
-}
-
-const char *Proxy::getComputerName()
-{
- //
- // Strangely enough, under some Windows OSes SMB
- // service doesn't bind to localhost. Fall back
- // to localhost if can't find computer name in
- // the environment. In future we should try to
- // bind to localhost and then try the other IPs.
- //
-
- const char *hostname = NULL;
-
- #ifdef __CYGWIN32__
-
- hostname = getenv("COMPUTERNAME");
-
- #endif
-
- if (hostname == NULL)
- {
- hostname = "localhost";
- }
-
- return hostname;
-}
-
-//
-// Handle data from channels selected for read.
-//
-
-int Proxy::handleRead(int &resultFds, fd_set &readSet)
-{
- #ifdef DEBUG
- *logofs << "Proxy: Checking descriptors selected for read.\n"
- << logofs_flush;
- #endif
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- #ifdef DEBUG
- *logofs << "Proxy: Looping with current channel "
- << *j << ".\n" << logofs_flush;
- #endif
-
- int fd = getFd(*j);
-
- if (fd >= 0 && resultFds > 0 && FD_ISSET(fd, &readSet))
- {
- #ifdef DEBUG
- *logofs << "Proxy: Going to read messages from FD#"
- << fd << ".\n" << logofs_flush;
- #endif
-
- int result = handleRead(fd);
-
- if (result < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failure reading messages from FD#"
- << fd << ".\n" << logofs_flush;
- #endif
-
- return -1;
- }
-
- #ifdef DEBUG
- *logofs << "Proxy: Clearing the read descriptor "
- << "for FD#" << fd << ".\n" << logofs_flush;
- #endif
-
- FD_CLR(fd, &readSet);
-
- resultFds--;
- }
- }
-
- if (resultFds > 0 && FD_ISSET(fd_, &readSet))
- {
- #ifdef DEBUG
- *logofs << "Proxy: Going to read messages from "
- << "proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- if (handleRead() < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failure reading from proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- return -1;
- }
-
- #ifdef DEBUG
- *logofs << "Proxy: Clearing the read descriptor "
- << "for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- FD_CLR(fd_, &readSet);
-
- resultFds--;
- }
-
- return 1;
-}
-
-//
-// Perform flush on descriptors selected for write.
-//
-
-int Proxy::handleFlush(int &resultFds, fd_set &writeSet)
-{
- #ifdef DEBUG
- *logofs << "Proxy: Checking descriptors selected for write.\n"
- << logofs_flush;
- #endif
-
- if (resultFds > 0 && FD_ISSET(fd_, &writeSet))
- {
- #ifdef TEST
- *logofs << "Proxy: FLUSH! Proxy descriptor FD#" << fd_
- << " reported to be writable.\n"
- << logofs_flush;
- #endif
-
- if (handleFlush() < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failure flushing the writable "
- << "proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- return -1;
- }
-
- #ifdef DEBUG
- *logofs << "Proxy: Clearing the write descriptor "
- << "for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- FD_CLR(fd_, &writeSet);
-
- resultFds--;
- }
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- resultFds > 0 && j != channelList.end(); j++)
- {
- #ifdef DEBUG
- *logofs << "Proxy: Looping with current channel "
- << *j << ".\n" << logofs_flush;
- #endif
-
- int fd = getFd(*j);
-
- if (fd >= 0 && FD_ISSET(fd, &writeSet))
- {
- #ifdef TEST
- *logofs << "Proxy: X descriptor FD#" << fd
- << " reported to be writable.\n"
- << logofs_flush;
- #endif
-
- //
- // It can happen that, in handling reads, we
- // have destroyed the buffer associated to a
- // closed socket, so don't complain about
- // the errors.
- //
-
- handleFlush(fd);
-
- //
- // Clear the descriptor from the mask so
- // we don't confuse the agent if it's
- // not checking only its own descriptors.
- //
-
- #ifdef DEBUG
- *logofs << "Proxy: Clearing the write descriptor "
- << "for FD#" << fd << ".\n"
- << logofs_flush;
- #endif
-
- FD_CLR(fd, &writeSet);
-
- resultFds--;
- }
- }
-
- return 1;
-}
-
-int Proxy::handleRead()
-{
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Decoding data from proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- //
- // Decode all the available messages from
- // the remote proxy until is not possible
- // to read more.
- //
-
- for (;;)
- {
- int result = readBuffer_.readMessage();
-
- #if defined(TEST) || defined(DEBUG) || defined(INFO)
- *logofs << "Proxy: Read result on proxy FD#" << fd_
- << " is " << result << ".\n"
- << logofs_flush;
- #endif
-
- if (result < 0)
- {
- if (shutdown_ == 0)
- {
- if (finish_ == 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Failure reading from the "
- << "peer proxy on FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Failure reading from the "
- << "peer proxy.\n";
- }
- }
- #ifdef TEST
- else
- {
- *logofs << "Proxy: Closure of the proxy link detected "
- << "after clean shutdown.\n" << logofs_flush;
- }
- #endif
-
- priority_ = 0;
- finish_ = 1;
- congestion_ = 0;
-
- return -1;
- }
- else if (result == 0)
- {
- #if defined(TEST) || defined(DEBUG) || defined(INFO)
- *logofs << "Proxy: No data read from proxy FD#"
- << fd_ << "\n" << logofs_flush;
- #endif
-
- return 0;
- }
-
- //
- // We read some data from the remote. If we set
- // the congestion flag because we couldn't read
- // before the timeout and have tokens available,
- // then reset the congestion flag.
- //
-
- if (congestion_ == 1 &&
- tokens_[token_control].remaining > 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Exiting congestion due to "
- << "proxy data with " << tokens_[token_control].remaining
- << " tokens.\n" << logofs_flush;
- #endif
-
- congestion_ = 0;
- }
-
- //
- // Set the timestamp of the last read
- // operation from the remote proxy and
- // enable again showing the 'no data
- // received' dialog at the next timeout.
- //
-
- timeouts_.readTs = getTimestamp();
-
- if (alert_ != 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Displacing the dialog "
- << "for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- HandleAlert(DISPLACE_MESSAGE_ALERT, 1);
- }
-
- timeouts_.alertTs = nullTimestamp();
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Getting messages from proxy FD#" << fd_
- << " with " << readBuffer_.getLength() << " bytes "
- << "in the read buffer.\n" << logofs_flush;
- #endif
-
- unsigned int controlLength;
- unsigned int dataLength;
-
- const unsigned char *message;
-
- while ((message = readBuffer_.getMessage(controlLength, dataLength)) != NULL)
- {
- statistics -> addFrameIn();
-
- if (controlLength == 3 && *message == 0 &&
- *(message + 1) < code_last_tag)
- {
- if (handleControlFromProxy(message) < 0)
- {
- return -1;
- }
- }
- else if (operation_ == operation_in_messages)
- {
- int channelId = inputChannel_;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Identified message of " << dataLength
- << " bytes for FD#" << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- if (channelId >= 0 && channelId < CONNECTIONS_LIMIT &&
- channels_[channelId] != NULL)
- {
- int finish = channels_[channelId] -> getFinish();
-
- #ifdef WARNING
-
- if (finish == 1)
- {
- *logofs << "Proxy: WARNING! Handling data for finishing "
- << "FD#" << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- }
-
- #endif
-
- //
- // We need to decode all the data to preserve
- // the consistency of the cache, so can't re-
- // turn as soon as the first error is encount-
- // ered. Check if this is the first time that
- // the failure is detected.
- //
-
- int result = channels_[channelId] -> handleWrite(message, dataLength);
-
- if (result < 0 && finish == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failed to write proxy data to FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- if (handleFinish(channelId) < 0)
- {
- return -1;
- }
- }
-
- //
- // Check if we have splits or motion
- // events to send.
- //
-
- setSplitTimeout(channelId);
- setMotionTimeout(channelId);
- }
- #ifdef WARNING
- else
- {
- *logofs << "Proxy: WARNING! Received data for "
- << "invalid channel ID#" << channelId
- << ".\n" << logofs_flush;
- }
- #endif
- }
- else if (operation_ == operation_in_statistics)
- {
- #ifdef TEST
- *logofs << "Proxy: Received statistics data from remote proxy.\n"
- << logofs_flush;
- #endif
-
- if (handleStatisticsFromProxy(message, dataLength) < 0)
- {
- return -1;
- }
-
- operation_ = operation_in_messages;
- }
- else if (operation_ == operation_in_negotiation)
- {
- #ifdef TEST
- *logofs << "Proxy: Received new negotiation data from remote proxy.\n"
- << logofs_flush;
- #endif
-
- if (handleNegotiationFromProxy(message, dataLength) < 0)
- {
- return -1;
- }
- }
-
- //
- // if (controlLength == 3 && *message == 0 && ...) ...
- // else if (operation_ == operation_in_statistics) ...
- // else if (operation_ == operation_in_messages) ...
- // else if (operation_ == operation_in_negotiation) ...
- // else ...
- //
-
- else
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Unrecognized message received on proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Unrecognized message received on proxy FD#"
- << fd_ << ".\n";
-
- return -1;
- }
-
- } // while ((message = readBuffer_.getMessage(controlLength, dataLength)) != NULL) ...
-
- //
- // Reset the read buffer.
- //
-
- readBuffer_.fullReset();
-
- //
- // Give up if no data is readable.
- //
-
- if (transport_ -> readable() == 0)
- {
- break;
- }
-
- } // End of for (;;) ...
-
- return 1;
-}
-
-int Proxy::handleControlFromProxy(const unsigned char *message)
-{
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Received message '" << DumpControl(*(message + 1))
- << "' at " << strMsTimestamp() << " with data ID#"
- << (int) *(message + 2) << ".\n" << logofs_flush;
- #endif
-
- T_channel_type channelType = channel_none;
-
- switch (*(message + 1))
- {
- case code_switch_connection:
- {
- int channelId = *(message + 2);
-
- //
- // If channel is invalid further messages will
- // be ignored. The acknowledged shutdown of
- // channels should prevent this.
- //
-
- inputChannel_ = channelId;
-
- break;
- }
- case code_begin_congestion:
- {
- //
- // Set the congestion state for the
- // channel reported by the remote.
- //
-
- int channelId = *(message + 2);
-
- if (channels_[channelId] != NULL)
- {
- congestions_[channelId] = 1;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Received a begin congestion "
- << "for channel id ID#" << channelId
- << ".\n" << logofs_flush;
- #endif
-
- if (channelId == agent_ && congestions_[agent_] != 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Forcing an update of the congestion "
- << "counter with agent congested.\n"
- << logofs_flush;
- #endif
-
- statistics -> updateCongestion(-tokens_[token_control].remaining,
- tokens_[token_control].limit);
- }
- }
- #ifdef WARNING
- else
- {
- *logofs << "Proxy: WARNING! Received a begin congestion "
- << "for invalid channel id ID#" << channelId
- << ".\n" << logofs_flush;
- }
- #endif
-
- break;
- }
- case code_end_congestion:
- {
- //
- // Attend again to the channel.
- //
-
- int channelId = *(message + 2);
-
- if (channels_[channelId] != NULL)
- {
- congestions_[channelId] = 0;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Received an end congestion "
- << "for channel id ID#" << channelId
- << ".\n" << logofs_flush;
- #endif
-
- if (channelId == agent_ && congestions_[agent_] != 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Forcing an update of the congestion "
- << "counter with agent decongested.\n"
- << logofs_flush;
- #endif
-
- statistics -> updateCongestion(tokens_[token_control].remaining,
- tokens_[token_control].limit);
- }
- }
- #ifdef WARNING
- else
- {
- *logofs << "Proxy: WARNING! Received an end congestion "
- << "for invalid channel id ID#" << channelId
- << ".\n" << logofs_flush;
- }
- #endif
-
- break;
- }
- case code_control_token_request:
- {
- T_proxy_token &token = tokens_[token_control];
-
- if (handleTokenFromProxy(token, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_split_token_request:
- {
- T_proxy_token &token = tokens_[token_split];
-
- if (handleTokenFromProxy(token, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_data_token_request:
- {
- T_proxy_token &token = tokens_[token_data];
-
- if (handleTokenFromProxy(token, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_control_token_reply:
- {
- T_proxy_token &token = tokens_[token_control];
-
- if (handleTokenReplyFromProxy(token, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_split_token_reply:
- {
- T_proxy_token &token = tokens_[token_split];
-
- if (handleTokenReplyFromProxy(token, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_data_token_reply:
- {
- T_proxy_token &token = tokens_[token_data];
-
- if (handleTokenReplyFromProxy(token, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_new_x_connection:
- {
- //
- // Opening the channel is handled later.
- //
-
- channelType = channel_x11;
-
- break;
- }
- case code_new_cups_connection:
- {
- channelType = channel_cups;
-
- break;
- }
- case code_new_aux_connection:
- {
- //
- // Starting from version 1.5.0 we create real X
- // connections for the keyboard channel. We need
- // to refuse old auxiliary X connections because
- // they would be unable to leverage the new fake
- // authorization cookie.
- //
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Can't open outdated auxiliary X "
- << "channel for code " << *(message + 1) << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Can't open outdated auxiliary X "
- << "channel for code " << *(message + 1) << ".\n";
-
- if (handleControl(code_drop_connection, *(message + 2)) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_new_smb_connection:
- {
- channelType = channel_smb;
-
- break;
- }
- case code_new_media_connection:
- {
- channelType = channel_media;
-
- break;
- }
- case code_new_http_connection:
- {
- channelType = channel_http;
-
- break;
- }
- case code_new_font_connection:
- {
- channelType = channel_font;
-
- break;
- }
- case code_new_slave_connection:
- {
- channelType = channel_slave;
-
- break;
- }
- case code_drop_connection:
- {
- int channelId = *(message + 2);
-
- if (channelId >= 0 && channelId < CONNECTIONS_LIMIT &&
- channels_[channelId] != NULL)
- {
- handleDropFromProxy(channelId);
- }
- #ifdef WARNING
- else
- {
- *logofs << "Proxy: WARNING! Received a drop message "
- << "for invalid channel id ID#" << channelId
- << ".\n" << logofs_flush;
- }
- #endif
-
- break;
- }
- case code_finish_connection:
- {
- int channelId = *(message + 2);
-
- if (channelId >= 0 && channelId < CONNECTIONS_LIMIT &&
- channels_[channelId] != NULL)
- {
- //
- // Force the finish state on the channel.
- // We can receive this message while in
- // the read loop, so we only mark the
- // channel for deletion.
- //
-
- #ifdef TEST
- *logofs << "Proxy: Received a finish message for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- handleFinishFromProxy(channelId);
- }
- #ifdef WARNING
- else
- {
- *logofs << "Proxy: WARNING! Received a finish message "
- << "for invalid channel id ID#" << channelId
- << ".\n" << logofs_flush;
- }
- #endif
-
- break;
- }
- case code_finish_listeners:
- {
- //
- // This is from the main loop.
- //
-
- #ifdef TEST
- *logofs << "Proxy: Closing down all local listeners.\n"
- << logofs_flush;
- #endif
-
- CleanupListeners();
-
- finish_ = 1;
-
- break;
- }
- case code_reset_request:
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Proxy reset not supported "
- << "in this version.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Proxy reset not supported "
- << "in this version.\n";
-
- HandleCleanup();
- }
- case code_shutdown_request:
- {
- //
- // Time to rest in peace.
- //
-
- shutdown_ = 1;
-
- break;
- }
- case code_load_request:
- {
- if (handleLoadFromProxy() < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_save_request:
- {
- //
- // Don't abort the connection
- // if can't write to disk.
- //
-
- handleSaveFromProxy();
-
- break;
- }
- case code_statistics_request:
- {
- int type = *(message + 2);
-
- if (handleStatisticsFromProxy(type) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_statistics_reply:
- {
- operation_ = operation_in_statistics;
-
- break;
- }
- case code_alert_request:
- {
- HandleAlert(*(message + 2), 1);
-
- break;
- }
- case code_sync_request:
- {
- int channelId = *(message + 2);
-
- if (handleSyncFromProxy(channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- case code_sync_reply:
- {
- //
- // We are not the one that issued
- // the request.
- //
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: PANIC! Received an unexpected "
- << "synchronization reply.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Received an unexpected "
- << "synchronization reply.\n";
-
- HandleCleanup();
- }
- default:
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Received bad control message number "
- << (unsigned int) *(message + 1) << " with attribute "
- << (unsigned int) *(message + 2) << ".\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Received bad control message number "
- << (unsigned int) *(message + 1) << " with attribute "
- << (unsigned int) *(message + 2) << ".\n";
-
- HandleCleanup();
- }
-
- } // End of switch (*(message + 1)) ...
-
- if (channelType == channel_none)
- {
- return 1;
- }
-
- //
- // Handle the channel allocation that we
- // left from the main switch case.
- //
-
- int channelId = *(message + 2);
-
- //
- // Check if the channel has been dropped.
- //
-
- if (channels_[channelId] != NULL &&
- (channels_[channelId] -> getDrop() == 1 ||
- channels_[channelId] -> getClosing() == 1))
- {
- #ifdef TEST
- *logofs << "Proxy: Dropping the descriptor FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- handleDrop(channelId);
- }
-
- //
- // Check if the channel is in the valid
- // range.
- //
-
- int result = checkChannelMap(channelId);
-
- if (result >= 0)
- {
- result = handleNewConnectionFromProxy(channelType, channelId);
- }
-
- if (result < 0)
- {
- //
- // Realization of new channel failed.
- // Send channel shutdown message to
- // the peer proxy.
- //
-
- if (handleControl(code_drop_connection, channelId) < 0)
- {
- return -1;
- }
- }
- else
- {
- int fd = getFd(channelId);
-
- if (getReadable(fd) > 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Trying to read immediately "
- << "from descriptor FD#" << fd << ".\n"
- << logofs_flush;
- #endif
-
- if (handleRead(fd) < 0)
- {
- return -1;
- }
- }
- #ifdef TEST
- *logofs << "Proxy: Nothing to read immediately "
- << "from descriptor FD#" << fd << ".\n"
- << logofs_flush;
- #endif
- }
-
- return 1;
-}
-
-int Proxy::handleRead(int fd, const char *data, int size)
-{
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Handling data for connection on FD#"
- << fd << ".\n" << logofs_flush;
- #endif
-
- if (canRead(fd) == 0)
- {
- #if defined(TEST) || defined(INFO)
-
- if (getChannel(fd) < 0)
- {
- *logofs << "Proxy: PANIC! Can't read from invalid FD#"
- << fd << ".\n" << logofs_flush;
-
- HandleCleanup();
- }
- else
- {
- *logofs << "Proxy: WARNING! Read method called for FD#"
- << fd << " but operation is not possible.\n"
- << logofs_flush;
- }
-
- #endif
-
- return 0;
- }
-
- int channelId = getChannel(fd);
-
- //
- // Let the channel object read all the new data from
- // its file descriptor, isolate messages, compress
- // those messages, and append the compressed form to
- // the encode buffer.
- //
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Reading messages from FD#" << fd
- << " channel ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- int result = channels_[channelId] -> handleRead(encodeBuffer_, (const unsigned char *) data,
- (unsigned int) size);
-
- //
- // Even in the case of a failure, write the produced
- // data to the proxy connection. To keep the stores
- // synchronized, the remote side needs to decode any
- // message encoded by this side, also if the X socket
- // was closed in the meanwhile. If this is the case,
- // the decompressed output will be silently discarded.
- //
-
- if (result < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failed to read data from connection FD#"
- << fd << " channel ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- if (handleFinish(channelId) < 0)
- {
- return -1;
- }
- }
-
- //
- // Check if there are new splits or
- // motion events to send.
- //
-
- setSplitTimeout(channelId);
- setMotionTimeout(channelId);
-
- return 1;
-}
-
-int Proxy::handleEvents()
-{
- #ifdef TEST
- *logofs << "Proxy: Going to check the events on channels.\n"
- << logofs_flush;
- #endif
-
- //
- // Check if we can safely write to the
- // proxy link.
- //
-
- int read = isTimeToRead();
-
- //
- // Loop on channels and send the pending
- // events. We must copy the list because
- // channels can be removed in the middle
- // of the loop.
- //
-
- T_list channelList = activeChannels_.copyList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] == NULL)
- {
- continue;
- }
-
- //
- // Check if we need to drop the channel.
- //
-
- if (channels_[channelId] -> getDrop() == 1 ||
- channels_[channelId] -> getClosing() == 1)
- {
- #ifdef TEST
- *logofs << "Proxy: Dropping the descriptor FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- if (handleDrop(channelId) < 0)
- {
- return -1;
- }
-
- continue;
- }
- else if (channels_[channelId] -> getFinish() == 1)
- {
- #ifdef TEST
- *logofs << "Proxy: Skipping finishing "
- << "descriptor FD#" << getFd(channelId)
- << " channel ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- continue;
- }
-
- //
- // If the proxy link or the channel is
- // in congestion state, don't handle
- // the further events.
- //
-
- if (read == 0 || congestions_[channelId] == 1)
- {
- #ifdef TEST
-
- if (read == 0)
- {
- *logofs << "Proxy: Can't handle events for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " with proxy not available.\n"
- << logofs_flush;
- }
- else
- {
- *logofs << "Proxy: Can't handle events for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " with channel congested.\n"
- << logofs_flush;
- }
-
- #endif
-
- continue;
- }
-
- //
- // Handle the timeouts on the channel
- // operations.
- //
-
- int result = 0;
-
- //
- // Handle the motion events.
- //
-
- if (result >= 0 && channels_[channelId] -> needMotion() == 1)
- {
- if (isTimeToMotion() == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
-
- *logofs << "Proxy: FLUSH! Motion timeout expired after "
- << diffTimestamp(timeouts_.motionTs, getTimestamp())
- << " Ms.\n" << logofs_flush;
-
- #endif
-
- result = channels_[channelId] -> handleMotion(encodeBuffer_);
-
- #ifdef TEST
-
- if (result < 0)
- {
- *logofs << "Proxy: Failed to handle motion events for FD#"
- << getFd(channelId) << " channel ID#" << channelId
- << ".\n" << logofs_flush;
- }
-
- #endif
-
- timeouts_.motionTs = nullTimestamp();
-
- setMotionTimeout(channelId);
- }
- #if defined(TEST) || defined(INFO)
- else if (isTimestamp(timeouts_.motionTs) == 1)
- {
- *logofs << "Proxy: Running with "
- << diffTimestamp(timeouts_.motionTs, getTimestamp())
- << " Ms elapsed since the last motion.\n"
- << logofs_flush;
- }
- #endif
- }
-
- if (result >= 0 && channels_[channelId] -> needSplit() == 1)
- {
- //
- // Check if it is time to send more splits
- // and how many bytes are going to be sent.
- //
-
- if (isTimeToSplit() == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
- *logofs << "Proxy: SPLIT! Split timeout expired after "
- << diffTimestamp(timeouts_.splitTs, getTimestamp())
- << " Ms.\n" << logofs_flush;
- #endif
-
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
-
- *logofs << "Proxy: SPLIT! Encoding splits for FD#"
- << getFd(channelId) << " at " << strMsTimestamp()
- << " with " << clientStore_ -> getSplitTotalStorageSize()
- << " total bytes and " << control -> SplitDataPacketLimit
- << " bytes " << "to write.\n"
- << logofs_flush;
-
- #endif
-
- result = channels_[channelId] -> handleSplit(encodeBuffer_);
-
- #ifdef TEST
-
- if (result < 0)
- {
- *logofs << "Proxy: Failed to handle splits for FD#"
- << getFd(channelId) << " channel ID#" << channelId
- << ".\n" << logofs_flush;
- }
-
- #endif
-
- timeouts_.splitTs = nullTimestamp();
-
- setSplitTimeout(channelId);
- }
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
- else if (channels_[channelId] -> needSplit() == 1 &&
- isTimestamp(timeouts_.splitTs) == 0)
- {
- *logofs << "Proxy: SPLIT! WARNING! Channel for FD#"
- << getFd(channelId) << " has split to send but "
- << "there is no timeout.\n" << logofs_flush;
- }
- else if (isTimestamp(timeouts_.splitTs) == 1)
- {
- *logofs << "Proxy: SPLIT! Running with "
- << diffTimestamp(timeouts_.splitTs, getTimestamp())
- << " Ms elapsed since the last split.\n"
- << logofs_flush;
- }
- #endif
- }
-
- if (result < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Error handling events for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- if (handleFinish(channelId) < 0)
- {
- return -1;
- }
- }
- }
-
- return 1;
-}
-
-int Proxy::handleFrame(T_frame_type type)
-{
- //
- // Write any outstanding control message, followed by the
- // content of the encode buffer, to the proxy transport.
- //
- // This code assumes that the encode buffer data is at an
- // offset several bytes from start of the buffer, so that
- // the length header and any necessary control bytes can
- // be inserted in front of the data already in the buffer.
- // This is the easiest way to encapsulate header and data
- // together in a single frame.
- //
- // The way framing is implemented is very efficient but
- // inherently limited and does not allow for getting the
- // best performance, especially when running over a fast
- // link. Framing should be rewritten to include the length
- // of the packets in a fixed size header and, possibly,
- // to incapsulate the control messages and the channel's
- // data in a pseudo X protocol message, so that the proxy
- // itself would be treated like any other channel.
- //
-
- #if defined(TEST) || defined(INFO)
-
- if (congestion_ == 1)
- {
- //
- // This can happen because there may be control
- // messages to send, like a proxy shutdown mes-
- // sage or a statistics request. All the other
- // cases should be considered an error.
- //
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Data is to be sent while "
- << "congestion is " << congestion_ << ".\n"
- << logofs_flush;
- #endif
- }
-
- #endif
-
- //
- // Check if there is any data available on
- // the socket. Recent Linux kernels are very
- // picky. They require that we read often or
- // they assume that the process is non-inter-
- // active.
- //
-
- if (handleAsyncEvents() < 0)
- {
- return -1;
- }
-
- //
- // Check if this is a ping, not a data frame.
- //
-
- if (type == frame_ping)
- {
- if (handleToken(frame_ping) < 0)
- {
- return -1;
- }
- }
-
- unsigned int dataLength = encodeBuffer_.getLength();
-
- #ifdef DEBUG
- *logofs << "Proxy: Data length is " << dataLength
- << " control length is " << controlLength_
- << ".\n" << logofs_flush;
- #endif
-
- if (dataLength > 0)
- {
- //
- // If this is a generic channel we need
- // to add the completion bits. Data can
- // also have been encoded because of a
- // statistics request, even if no output
- // channel was currently selected.
- //
-
- if (outputChannel_ != -1)
- {
- #if defined(TEST) || defined(INFO)
-
- if (channels_[outputChannel_] == NULL)
- {
- *logofs << "Proxy: PANIC! A new frame was requested "
- << "but the channel is invalid.\n"
- << logofs_flush;
-
- HandleCleanup();
- }
-
- #endif
-
- channels_[outputChannel_] -> handleCompletion(encodeBuffer_);
-
- dataLength = encodeBuffer_.getLength();
- }
- }
- else if (controlLength_ == 0)
- {
- #if defined(TEST) || defined(INFO)
-
- *logofs << "Proxy: PANIC! A new frame was requested "
- << "but there is no data to write.\n"
- << logofs_flush;
-
- HandleCleanup();
-
- #endif
-
- return 0;
- }
-
- #ifdef DEBUG
- *logofs << "Proxy: Data length is now " << dataLength
- << " control length is " << controlLength_
- << ".\n" << logofs_flush;
- #endif
-
- //
- // Check if this frame needs to carry a new
- // token request.
- //
-
- if (type == frame_data)
- {
- if (handleToken(frame_data) < 0)
- {
- return -1;
- }
- }
-
- #ifdef DEBUG
- *logofs << "Proxy: Adding a new frame for the remote proxy.\n"
- << logofs_flush;
- #endif
-
- unsigned char temp[5];
-
- unsigned int lengthLength = 0;
- unsigned int shift = dataLength;
-
- while (shift)
- {
- temp[lengthLength++] = (unsigned char) (shift & 0x7f);
-
- shift >>= 7;
- }
-
- unsigned char *data = encodeBuffer_.getData();
-
- unsigned char *outputMessage = data - (controlLength_ + lengthLength);
-
- unsigned char *nextDest = outputMessage;
-
- for (int i = 0; i < controlLength_; i++)
- {
- *nextDest++ = controlCodes_[i];
- }
-
- for (int j = lengthLength - 1; j > 0; j--)
- {
- *nextDest++ = (temp[j] | 0x80);
- }
-
- if (lengthLength)
- {
- *nextDest++ = temp[0];
- }
-
- unsigned int outputLength = dataLength + controlLength_ + lengthLength;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Produced plain output for " << dataLength << "+"
- << controlLength_ << "+" << lengthLength << " out of "
- << outputLength << " bytes.\n" << logofs_flush;
- #endif
-
- #if defined(TEST) || defined(INFO) || defined(FLUSH) || defined(TIME)
-
- T_timestamp nowTs = getTimestamp();
-
- *logofs << "Proxy: FLUSH! Immediate with blocked " << transport_ ->
- blocked() << " length " << transport_ -> length()
- << " new " << outputLength << " flushable " << transport_ ->
- flushable() << " tokens " << tokens_[token_control].remaining
- << " after " << diffTimestamp(timeouts_.writeTs, nowTs)
- << " Ms.\n" << logofs_flush;
-
- *logofs << "Proxy: FLUSH! Immediate flush to proxy FD#" << fd_
- << " of " << outputLength << " bytes at " << strMsTimestamp()
- << " with priority " << priority_ << ".\n" << logofs_flush;
-
- *logofs << "Proxy: FLUSH! Current bitrate is "
- << statistics -> getBitrateInShortFrame() << " with "
- << statistics -> getBitrateInLongFrame() << " in the "
- << "long frame and top " << statistics ->
- getTopBitrate() << ".\n" << logofs_flush;
- #endif
-
- statistics -> addWriteOut();
-
- int result = transport_ -> write(write_immediate, outputMessage, outputLength);
-
- #ifdef TIME
-
- if (diffTimestamp(timeouts_.writeTs, nowTs) > 50)
- {
- *logofs << "Proxy: WARNING! TIME! Data written to proxy FD#"
- << fd_ << " at " << strMsTimestamp() << " after "
- << diffTimestamp(timeouts_.writeTs, nowTs)
- << " Ms.\n" << logofs_flush;
- }
-
- #endif
-
- #ifdef DUMP
- *logofs << "Proxy: Sent " << outputLength << " bytes of data "
- << "with checksum ";
-
- DumpChecksum(outputMessage, outputLength);
-
- *logofs << " on proxy FD#" << fd_ << ".\n" << logofs_flush;
- #endif
-
- #ifdef DUMP
- *logofs << "Proxy: Partial checksums are:\n";
-
- DumpBlockChecksums(outputMessage, outputLength, 256);
-
- *logofs << logofs_flush;
- #endif
-
- //
- // Clean up the encode buffer and
- // bring it to the initial size.
- //
-
- encodeBuffer_.fullReset();
-
- //
- // Close the connection if we got
- // an error.
- //
-
- if (result < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failed write to proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- return -1;
- }
-
- //
- // Account for the data frame and the
- // framing overhead.
- //
-
- if (dataLength > 0)
- {
- statistics -> addFrameOut();
- }
-
- statistics -> addFramingBits((controlLength_ + lengthLength) << 3);
-
- controlLength_ = 0;
-
- //
- // Reset all buffers, counters and the
- // priority flag.
- //
-
- handleResetFlush();
-
- //
- // Check if more data became available
- // after writing.
- //
-
- if (handleAsyncEvents() < 0)
- {
- return -1;
- }
-
- //
- // Drain the proxy link if we are in
- // congestion state.
- //
- // if (needDrain() == 1 && draining_ == 0)
- // {
- // if (handleDrain() < 0)
- // {
- // return -1;
- // }
- // }
- //
-
- return result;
-}
-
-int Proxy::handleFlush()
-{
- //
- // We can have data in the encode buffer or
- // control bytes to send. In the case make
- // up a new frame.
- //
-
- if (encodeBuffer_.getLength() + controlLength_ > 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Flushing data in the encode buffer.\n"
- << logofs_flush;
- #endif
-
- priority_ = 1;
-
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- //
- // Check if we have something to write.
- //
-
- if (transport_ -> length() + transport_ -> flushable() == 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Nothing else to flush for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- return 0;
- }
-
- #if defined(TEST) || defined(INFO)
-
- if (transport_ -> blocked() == 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Proxy descriptor FD#" << fd_
- << " has data to flush but the transport "
- << "is not blocked.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Proxy descriptor FD#" << fd_
- << " has data to flush but the transport "
- << "is not blocked.\n";
-
- HandleCleanup();
- }
-
- #endif
-
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
- *logofs << "Proxy: FLUSH! Deferred with blocked " << transport_ ->
- blocked() << " length " << transport_ -> length()
- << " flushable " << transport_ -> flushable() << " tokens "
- << tokens_[token_control].remaining << ".\n"
- << logofs_flush;
-
- *logofs << "Proxy: FLUSH! Deferred flush to proxy FD#" << fd_
- << " of " << transport_ -> length() + transport_ ->
- flushable() << " bytes at " << strMsTimestamp()
- << " with priority " << priority_ << ".\n"
- << logofs_flush;
-
- *logofs << "Proxy: FLUSH! Current bitrate is "
- << statistics -> getBitrateInShortFrame() << " with "
- << statistics -> getBitrateInLongFrame() << " in the "
- << "long frame and top " << statistics ->
- getTopBitrate() << ".\n" << logofs_flush;
- #endif
-
- statistics -> addWriteOut();
-
- int result = transport_ -> flush();
-
- if (result < 0)
- {
- return -1;
- }
-
- //
- // Reset the counters and update the
- // timestamp of the last write.
- //
-
- handleResetFlush();
-
- return result;
-}
-
-int Proxy::handleDrain()
-{
- //
- // If the proxy is run in the same process
- // as SSH, we can't block or the program
- // would not have a chance to read or write
- // its data.
- //
-
- if (control -> LinkEncrypted == 1)
- {
- return 0;
- }
-
- if (needDrain() == 0 || draining_ == 1)
- {
- #if defined(TEST) || defined(INFO)
-
- if (draining_ == 1)
- {
- *logofs << "Proxy: WARNING! Already draining proxy FD#"
- << fd_ << " at " << strMsTimestamp() << ".\n"
- << logofs_flush;
- }
- else
- {
- *logofs << "Proxy: WARNING! No need to drain proxy FD#"
- << fd_ << " with congestion " << congestion_
- << " length " << transport_ -> length()
- << " and blocked " << transport_ -> blocked()
- << ".\n" << logofs_flush;
- }
-
- #endif
-
- return 0;
- }
-
- draining_ = 1;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Going to drain the proxy FD#" << fd_
- << " at " << strMsTimestamp() << ".\n"
- << logofs_flush;
- #endif
-
- int timeout = control -> PingTimeout / 2;
-
- T_timestamp startTs = getNewTimestamp();
-
- T_timestamp nowTs = startTs;
-
- int remaining;
- int result;
-
- //
- // Keep draining the proxy socket while
- // reading the incoming messages until
- // the timeout is expired.
- //
-
- for (;;)
- {
- remaining = timeout - diffTimestamp(startTs, nowTs);
-
- if (remaining <= 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Timeout raised while draining "
- << "FD#" << fd_ << " at " << strMsTimestamp()
- << " after " << diffTimestamp(startTs, nowTs)
- << " Ms.\n" << logofs_flush;
- #endif
-
- result = 0;
-
- goto ProxyDrainEnd;
- }
-
- if (transport_ -> length() > 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Trying to write to FD#" << fd_
- << " at " << strMsTimestamp() << " with length "
- << transport_ -> length() << " and "
- << remaining << " Ms remaining.\n"
- << logofs_flush;
- #endif
-
- result = transport_ -> drain(0, remaining);
-
- if (result == -1)
- {
- result = -1;
-
- goto ProxyDrainEnd;
- }
- else if (result == 0 && transport_ -> readable() > 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Decoding more data from proxy FD#"
- << fd_ << " at " << strMsTimestamp() << " with "
- << transport_ -> length() << " bytes to write and "
- << transport_ -> readable() << " readable.\n"
- << logofs_flush;
- #endif
-
- if (handleRead() < 0)
- {
- result = -1;
-
- goto ProxyDrainEnd;
- }
- }
- #if defined(TEST) || defined(INFO)
- else if (result == 1)
- {
- *logofs << "Proxy: Transport for proxy FD#" << fd_
- << " drained down to " << transport_ -> length()
- << " bytes.\n" << logofs_flush;
- }
- #endif
- }
- else
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Waiting for more data from proxy "
- << "FD#" << fd_ << " at " << strMsTimestamp()
- << " with " << remaining << " Ms remaining.\n"
- << logofs_flush;
- #endif
-
-
- result = transport_ -> wait(remaining);
-
- if (result == -1)
- {
- result = -1;
-
- goto ProxyDrainEnd;
- }
- else if (result > 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Decoding more data from proxy FD#"
- << fd_ << " at " << strMsTimestamp() << " with "
- << transport_ -> readable() << " bytes readable.\n"
- << logofs_flush;
- #endif
-
- if (handleRead() < 0)
- {
- result = -1;
-
- goto ProxyDrainEnd;
- }
- }
- }
-
- //
- // Check if we finally got the tokens
- // that would allow us to come out of
- // the congestion state.
- //
-
- if (needDrain() == 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Got decongestion for proxy FD#"
- << fd_ << " at " << strMsTimestamp() << " after "
- << diffTimestamp(startTs, getTimestamp())
- << " Ms.\n" << logofs_flush;
- #endif
-
- result = 1;
-
- goto ProxyDrainEnd;
- }
-
- nowTs = getNewTimestamp();
- }
-
-ProxyDrainEnd:
-
- draining_ = 0;
-
- return result;
-}
-
-int Proxy::handleFlush(int fd)
-{
- int channelId = getChannel(fd);
-
- if (channelId < 0 || channels_[channelId] == NULL)
- {
- #ifdef TEST
- *logofs << "Proxy: WARNING! Skipping flush on invalid "
- << "descriptor FD#" << fd << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- return 0;
- }
- else if (channels_[channelId] -> getFinish() == 1)
- {
- #ifdef TEST
- *logofs << "Proxy: Skipping flush on finishing "
- << "descriptor FD#" << fd << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- return 0;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Going to flush FD#" << fd
- << " with blocked " << transports_[channelId] -> blocked()
- << " length " << transports_[channelId] -> length()
- << ".\n" << logofs_flush;
- #endif
-
- if (channels_[channelId] -> handleFlush() < 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Failed to flush data to FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- handleFinish(channelId);
-
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleStatistics(int type, ostream *stream)
-{
- if (stream == NULL || control -> EnableStatistics == 0)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Cannot produce statistics "
- << " for proxy FD#" << fd_ << ". Invalid settings "
- << "for statistics or stream.\n" << logofs_flush;
- #endif
-
- return 0;
- }
- else if (currentStatistics_ != NULL)
- {
- //
- // Need to update the stream pointer as the
- // previous one could have been destroyed.
- //
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Replacing stream while producing "
- << "statistics in stream at " << currentStatistics_
- << " for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
- }
-
- currentStatistics_ = stream;
-
- //
- // Get statistics of remote peer.
- //
-
- if (handleControl(code_statistics_request, type) < 0)
- {
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleStatisticsFromProxy(int type)
-{
- if (needFlush() == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
- *logofs << "Proxy: WARNING! Data for the previous "
- << "channel ID#" << outputChannel_
- << " flushed in statistics.\n"
- << logofs_flush;
- #endif
-
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- if (control -> EnableStatistics == 1)
- {
- //
- // Allocate a buffer for the output.
- //
-
- char *buffer = new char[STATISTICS_LENGTH];
-
- *buffer = '\0';
-
- if (control -> ProxyMode == proxy_client)
- {
- #ifdef TEST
- *logofs << "Proxy: Producing "
- << (type == TOTAL_STATS ? "total" : "partial")
- << " client statistics for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- statistics -> getClientProtocolStats(type, buffer);
-
- statistics -> getClientOverallStats(type, buffer);
- }
- else
- {
- #ifdef TEST
- *logofs << "Proxy: Producing "
- << (type == TOTAL_STATS ? "total" : "partial")
- << " server statistics for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- statistics -> getServerProtocolStats(type, buffer);
- }
-
- if (type == PARTIAL_STATS)
- {
- statistics -> resetPartialStats();
- }
-
- unsigned int length = strlen((char *) buffer) + 1;
-
- encodeBuffer_.encodeValue(type, 8);
-
- encodeBuffer_.encodeValue(length, 32);
-
- #ifdef TEST
- *logofs << "Proxy: Encoding " << length
- << " bytes of statistics data for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- encodeBuffer_.encodeMemory((unsigned char *) buffer, length);
-
- //
- // Account statistics data as framing bits.
- //
-
- statistics -> addFramingBits(length << 3);
-
- delete [] buffer;
- }
- else
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Got statistics request "
- << "but local statistics are disabled.\n"
- << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Got statistics request "
- << "but local statistics are disabled.\n";
-
- type = NO_STATS;
-
- encodeBuffer_.encodeValue(type, 8);
-
- #ifdef TEST
- *logofs << "Proxy: Sending error code to remote proxy on FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
- }
-
- //
- // The next write will flush the statistics
- // data and the control message.
- //
-
- if (handleControl(code_statistics_reply, type) < 0)
- {
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleStatisticsFromProxy(const unsigned char *message, unsigned int length)
-{
- if (currentStatistics_ == NULL)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Unexpected statistics data received "
- << "from remote proxy on FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Unexpected statistics data received "
- << "from remote proxy.\n";
-
- return 0;
- }
-
- //
- // Allocate the decode buffer and at least
- // the 'type' field to see if there was an
- // error.
- //
-
- DecodeBuffer decodeBuffer(message, length);
-
- unsigned int type;
-
- decodeBuffer.decodeValue(type, 8);
-
- if (type == NO_STATS)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Couldn't get statistics from remote "
- << "proxy on FD#" << fd_ << ".\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Couldn't get statistics from remote proxy.\n";
- }
- else if (type != TOTAL_STATS && type != PARTIAL_STATS)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Cannot produce statistics "
- << "with qualifier '" << type << "'.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Cannot produce statistics "
- << "with qualifier '" << type << "'.\n";
-
- return -1;
- }
- else
- {
- unsigned int size;
-
- decodeBuffer.decodeValue(size, 32);
-
- char *buffer = new char[STATISTICS_LENGTH];
-
- *buffer = '\0';
-
- if (control -> EnableStatistics == 1)
- {
- if (control -> ProxyMode == proxy_client)
- {
- #ifdef TEST
- *logofs << "Proxy: Finalizing "
- << (type == TOTAL_STATS ? "total" : "partial")
- << " client statistics for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- statistics -> getClientCacheStats(type, buffer);
-
- #ifdef TEST
- *logofs << "Proxy: Decoding " << size
- << " bytes of statistics data for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size);
-
- statistics -> getClientProtocolStats(type, buffer);
-
- statistics -> getClientOverallStats(type, buffer);
- }
- else
- {
- #ifdef TEST
- *logofs << "Proxy: Finalizing "
- << (type == TOTAL_STATS ? "total" : "partial")
- << " server statistics for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- statistics -> getServerCacheStats(type, buffer);
-
- statistics -> getServerProtocolStats(type, buffer);
-
- #ifdef TEST
- *logofs << "Proxy: Decoding " << size
- << " bytes of statistics data for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size);
- }
-
- if (type == PARTIAL_STATS)
- {
- statistics -> resetPartialStats();
- }
-
- *currentStatistics_ << buffer;
-
- //
- // Mark the end of text to help external parsing.
- //
-
- *currentStatistics_ << '\4';
-
- *currentStatistics_ << flush;
- }
- else
- {
- //
- // It can be that statistics were enabled at the time
- // we issued the request (otherwise we could not have
- // set the stream), but now they have been disabled
- // by user. We must decode statistics data if we want
- // to keep the connection.
- //
-
- #ifdef TEST
- *logofs << "Proxy: Discarding " << size
- << " bytes of statistics data for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- strncat(buffer, (char *) decodeBuffer.decodeMemory(size), size);
- }
-
- delete [] buffer;
- }
-
- currentStatistics_ = NULL;
-
- return 1;
-}
-
-int Proxy::handleNegotiation(const unsigned char *message, unsigned int length)
-{
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Writing data during proxy "
- << "negotiation is not implemented.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Writing data during proxy "
- << "negotiation is not implemented.\n";
-
- return -1;
-}
-
-int Proxy::handleNegotiationFromProxy(const unsigned char *message, unsigned int length)
-{
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Reading data during proxy "
- << "negotiation is not implemented.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Reading data during proxy "
- << "negotiation is not implemented.\n";
-
- return -1;
-}
-
-int Proxy::handleAlert(int alert)
-{
- if (handleControl(code_alert_request, alert) < 0)
- {
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleCloseConnection(int clientFd)
-{
- int channelId = getChannel(clientFd);
-
- if (channels_[channelId] != NULL &&
- channels_[channelId] -> getFinish() == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Closing down the channel for FD#"
- << clientFd << ".\n" << logofs_flush;
- #endif
-
- if (handleFinish(channelId) < 0)
- {
- return -1;
- }
-
- return 1;
- }
-
- return 0;
-}
-
-int Proxy::handleCloseAllXConnections()
-{
- #ifdef TEST
- *logofs << "Proxy: Closing down any remaining X channel.\n"
- << logofs_flush;
- #endif
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL &&
- channels_[channelId] -> getType() == channel_x11 &&
- channels_[channelId] -> getFinish() == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Closing down the channel for FD#"
- << getFd(channelId) << ".\n" << logofs_flush;
- #endif
-
- if (handleFinish(channelId) < 0)
- {
- return -1;
- }
- }
- }
-
- return 1;
-}
-
-int Proxy::handleCloseAllListeners()
-{
- // Since ProtoStep7 (#issue 108)
- if (finish_ == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Closing down all remote listeners.\n"
- << logofs_flush;
- #endif
-
- if (handleControl(code_finish_listeners) < 0)
- {
- return -1;
- }
-
- finish_ = 1;
- }
-
- return 1;
-}
-
-void Proxy::handleResetAlert()
-{
- if (alert_ != 0)
- {
- #ifdef TEST
- *logofs << "Proxy: The proxy alert '" << alert_
- << "' was displaced.\n" << logofs_flush;
- #endif
-
- alert_ = 0;
- }
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL)
- {
- channels_[channelId] -> handleResetAlert();
- }
- }
-}
-
-int Proxy::handleFinish(int channelId)
-{
- //
- // Send any outstanding encoded data and
- // do any finalization needed on the
- // channel.
- //
-
- if (needFlush(channelId) == 1)
- {
- if (channels_[channelId] -> getFinish() == 1)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! The finishing channel ID#"
- << channelId << " has data to flush.\n"
- << logofs_flush;
- #endif
- }
-
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
- *logofs << "Proxy: WARNING! Flushing data for the "
- << "finishing channel ID#" << channelId
- << ".\n" << logofs_flush;
- #endif
-
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- //
- // Reset the congestion state and the
- // timeouts, if needed.
- //
-
- congestions_[channelId] = 0;
-
- setSplitTimeout(channelId);
- setMotionTimeout(channelId);
-
- if (channels_[channelId] -> getFinish() == 0)
- {
- channels_[channelId] -> handleFinish();
-
- //
- // Force a failure in the case somebody
- // would try to read from the channel.
- //
-
- shutdown(getFd(channelId), SHUT_RD);
-
- //
- // If the failure was not originated by
- // the remote, send a channel shutdown
- // message.
- //
-
- if (channels_[channelId] -> getClosing() == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Finishing channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " because of failure.\n"
- << logofs_flush;
- #endif
-
- if (handleControl(code_finish_connection, channelId) < 0)
- {
- return -1;
- }
- }
- }
-
- return 1;
-}
-
-int Proxy::handleFinishFromProxy(int channelId)
-{
- //
- // Check if this channel has pending
- // data to send.
- //
-
- if (needFlush(channelId) == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
- *logofs << "Proxy: WARNING! Flushing data for the "
- << "finishing channel ID#" << channelId
- << ".\n" << logofs_flush;
- #endif
-
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- //
- // Mark the channel. We will free its
- // resources at the next loop and will
- // send the drop message to the remote.
- //
-
- if (channels_[channelId] -> getClosing() == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Marking channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " as closing.\n"
- << logofs_flush;
- #endif
-
- channels_[channelId] -> handleClosing();
- }
-
- if (channels_[channelId] -> getFinish() == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Finishing channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " because of proxy.\n"
- << logofs_flush;
- #endif
-
- channels_[channelId] -> handleFinish();
- }
-
- if (handleFinish(channelId) < 0)
- {
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleDropFromProxy(int channelId)
-{
- //
- // Only mark the channel.
- //
-
- #ifdef TEST
- *logofs << "Proxy: Marking channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " as being dropped.\n"
- << logofs_flush;
- #endif
-
- if (channels_[channelId] -> getDrop() == 0)
- {
- channels_[channelId] -> handleDrop();
- }
-
- return 1;
-}
-
-//
-// Close the channel and deallocate all its
-// resources.
-//
-
-int Proxy::handleDrop(int channelId)
-{
- //
- // Check if this channel has pending
- // data to send.
- //
-
- if (needFlush(channelId) == 1)
- {
- if (channels_[channelId] -> getFinish() == 1)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! The dropping channel ID#"
- << channelId << " has data to flush.\n"
- << logofs_flush;
- #endif
- }
-
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
- *logofs << "Proxy: WARNING! Flushing data for the "
- << "dropping channel ID#" << channelId
- << ".\n" << logofs_flush;
- #endif
-
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- #ifdef TEST
- *logofs << "Proxy: Dropping channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << ".\n" << logofs_flush;
- #endif
-
- if (channels_[channelId] -> getFinish() == 0)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! The channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " was not marked as "
- << "finishing.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": The channel for FD#"
- << getFd(channelId) << " channel ID#"
- << channelId << " was not marked as "
- << "finishing.\n";
-
- channels_[channelId] -> handleFinish();
- }
-
- //
- // Send the channel shutdown message
- // to the peer proxy.
- //
-
- if (channels_[channelId] -> getClosing() == 1)
- {
- if (handleControl(code_drop_connection, channelId) < 0)
- {
- return -1;
- }
- }
-
- //
- // Get rid of the channel.
- //
-
- if (channels_[channelId] -> getType() != channel_x11)
- {
- #ifdef TEST
- *logofs << "Proxy: Closed connection to "
- << getTypeName(channels_[channelId] -> getType())
- << " server.\n" << logofs_flush;
- #endif
-
- cerr << "Info" << ": Closed connection to "
- << getTypeName(channels_[channelId] -> getType())
- << " server.\n";
- }
-
- delete channels_[channelId];
- channels_[channelId] = NULL;
-
- cleanupChannelMap(channelId);
-
- //
- // Get rid of the transport.
- //
-
- deallocateTransport(channelId);
-
- congestions_[channelId] = 0;
-
- decreaseChannels(channelId);
-
- //
- // Check if the channel was the
- // one currently selected for
- // output.
- //
-
- if (outputChannel_ == channelId)
- {
- outputChannel_ = -1;
- }
-
- return 1;
-}
-
-//
-// Send an empty message to the remote peer
-// to verify if the link is alive and let
-// the remote proxy detect a congestion.
-//
-
-int Proxy::handlePing()
-{
- T_timestamp nowTs = getTimestamp();
-
- #if defined(DEBUG) || defined(PING)
-
- *logofs << "Proxy: Checking ping at "
- << strMsTimestamp(nowTs) << logofs_flush;
-
- *logofs << " with last loop at "
- << strMsTimestamp(timeouts_.loopTs) << ".\n"
- << logofs_flush;
-
- *logofs << "Proxy: Last bytes in at "
- << strMsTimestamp(timeouts_.readTs) << logofs_flush;
-
- *logofs << " last bytes out at "
- << strMsTimestamp(timeouts_.writeTs) << ".\n"
- << logofs_flush;
-
- *logofs << "Proxy: Last ping at "
- << strMsTimestamp(timeouts_.pingTs) << ".\n"
- << logofs_flush;
-
- #endif
-
- //
- // Be sure we take into account any clock drift. This
- // can be caused by the user changing the system timer
- // or by small adjustments introduced by the operating
- // system making the clock go backward.
- //
-
- if (checkDiffTimestamp(timeouts_.loopTs, nowTs) == 0)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Detected drift in system "
- << "timer. Resetting to current time.\n"
- << logofs_flush;
- #endif
-
- timeouts_.pingTs = nowTs;
- timeouts_.readTs = nowTs;
- timeouts_.writeTs = nowTs;
- }
-
- //
- // Check timestamp of last read from remote proxy. It can
- // happen that we stayed in the main loop long enough to
- // have idle timeout expired, for example if the proxy was
- // stopped and restarted or because of an extremely high
- // load of the system. In this case we don't complain if
- // there is something new to read from the remote.
- //
-
- int diffIn = diffTimestamp(timeouts_.readTs, nowTs);
-
- if (diffIn >= (control -> PingTimeout * 2) -
- control -> LatencyTimeout)
- {
- //
- // Force a read to detect whether the remote proxy
- // aborted the connection.
- //
-
- int result = handleRead();
-
- if (result < 0)
- {
- #if defined(TEST) || defined(INFO) || defined(PING)
- *logofs << "Proxy: WARNING! Detected shutdown waiting "
- << "for the ping after " << diffIn / 1000
- << " seconds.\n" << logofs_flush;
- #endif
-
- return -1;
- }
- else if (result > 0)
- {
- diffIn = diffTimestamp(timeouts_.readTs, nowTs);
-
- if (handleFlush() < 0)
- {
- return -1;
- }
- }
- }
-
- if (diffIn >= (control -> PingTimeout * 2) -
- control -> LatencyTimeout)
- {
- #if defined(TEST) || defined(INFO) || defined(PING)
- *logofs << "Proxy: Detected congestion at "
- << strMsTimestamp() << " with " << diffIn / 1000
- << " seconds since the last read.\n"
- << logofs_flush;
- #endif
-
- //
- // There are two types of proxy congestion. The first,
- // affecting the ability of the proxy to write the
- // encoded data to the network, is controlled by the
- // congestion_ flag. The flag is raised when no data
- // is received from the remote proxy within a timeout.
- // On the X client side, the flag is also raised when
- // the proxy runs out of tokens.
- //
-
- if (control -> ProxyMode == proxy_server)
- {
- //
- // At X server side we must return to read data
- // from the channels after a while, because we
- // need to give a chance to the channel to read
- // the key sequence CTRL+ALT+SHIFT+ESC.
- //
-
- if (congestion_ == 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Forcibly entering congestion due to "
- << "timeout with " << tokens_[token_control].remaining
- << " tokens.\n" << logofs_flush;
- #endif
-
- congestion_ = 1;
- }
- else
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Forcibly exiting congestion due to "
- << "timeout with " << tokens_[token_control].remaining
- << " tokens.\n" << logofs_flush;
- #endif
-
- congestion_ = 0;
- }
- }
- else
- {
- #if defined(TEST) || defined(INFO)
-
- if (congestion_ == 0)
- {
- *logofs << "Proxy: Entering congestion due to timeout "
- << "with " << tokens_[token_control].remaining
- << " tokens.\n" << logofs_flush;
- }
-
- #endif
-
- congestion_ = 1;
- }
-
- if (control -> ProxyTimeout > 0 &&
- diffIn >= (control -> ProxyTimeout -
- control -> LatencyTimeout))
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! No data received from "
- << "remote proxy on FD#" << fd_ << " within "
- << (diffIn + control -> LatencyTimeout) / 1000
- << " seconds.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": No data received from remote "
- << "proxy within " << (diffIn + control ->
- LatencyTimeout) / 1000 << " seconds.\n";
-
- HandleAbort();
- }
- else
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: WARNING! No data received from "
- << "remote proxy on FD#" << fd_ << " since "
- << diffIn << " Ms.\n" << logofs_flush;
- #endif
-
- if (control -> ProxyTimeout > 0 &&
- isTimestamp(timeouts_.alertTs) == 0 &&
- diffIn >= (control -> ProxyTimeout -
- control -> LatencyTimeout) / 4)
- {
- //
- // If we are in the middle of a shutdown
- // procedure but the remote is not resp-
- // onding, force the closure of the link.
- //
-
- if (finish_ != 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! No response received from "
- << "the remote proxy on FD#" << fd_ << " while "
- << "waiting for the shutdown.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": No response received from remote "
- << "proxy while waiting for the shutdown.\n";
-
- HandleAbort();
- }
- else
- {
- cerr << "Warning" << ": No data received from remote "
- << "proxy within " << (diffIn + control ->
- LatencyTimeout) / 1000 << " seconds.\n";
-
- if (alert_ == 0)
- {
- if (control -> ProxyMode == proxy_client)
- {
- alert_ = CLOSE_DEAD_PROXY_CONNECTION_CLIENT_ALERT;
- }
- else
- {
- alert_ = CLOSE_DEAD_PROXY_CONNECTION_SERVER_ALERT;
- }
-
- HandleAlert(alert_, 1);
- }
-
- timeouts_.alertTs = nowTs;
- }
- }
- }
- }
-
- //
- // Check if we need to update the congestion
- // counter.
- //
-
- int diffOut = diffTimestamp(timeouts_.writeTs, nowTs);
-
- if (agent_ != nothing && congestions_[agent_] == 0 &&
- statistics -> getCongestionInFrame() >= 1 &&
- diffOut >= (control -> IdleTimeout -
- control -> LatencyTimeout * 5))
- {
- #if defined(TEST) || defined(INFO) || defined(PING)
- *logofs << "Proxy: Forcing an update of the "
- << "congestion counter after timeout.\n"
- << logofs_flush;
- #endif
-
- statistics -> updateCongestion(tokens_[token_control].remaining,
- tokens_[token_control].limit);
- }
-
- //
- // Send a new token if we didn't send any data to
- // the remote for longer than the ping timeout.
- // The client side sends a token, the server side
- // responds with a token reply.
- //
- // VMWare virtual machines can have the system
- // timer deadly broken. Try to send a ping regard-
- // less we are the client or the server proxy to
- // force a write by the remote.
- //
-
- if (control -> ProxyMode == proxy_client ||
- diffIn >= (control -> PingTimeout * 4) -
- control -> LatencyTimeout)
- {
- //
- // We need to send a new ping even if we didn't
- // receive anything from the remote within the
- // ping timeout. The server side will respond
- // to our ping, so we use the ping to force the
- // remote end to send some data.
- //
-
- if (diffIn >= (control -> PingTimeout -
- control -> LatencyTimeout * 5) ||
- diffOut >= (control -> PingTimeout -
- control -> LatencyTimeout * 5))
- {
- int diffPing = diffTimestamp(timeouts_.pingTs, nowTs);
-
- if (diffPing < 0 || diffPing >= (control -> PingTimeout -
- control -> LatencyTimeout * 5))
- {
- #if defined(TEST) || defined(INFO) || defined(PING)
- *logofs << "Proxy: Sending a new ping at " << strMsTimestamp()
- << " with " << tokens_[token_control].remaining
- << " tokens and elapsed in " << diffIn << " out "
- << diffOut << " ping " << diffPing
- << ".\n" << logofs_flush;
- #endif
-
- if (handleFrame(frame_ping) < 0)
- {
- return -1;
- }
-
- timeouts_.pingTs = nowTs;
- }
- #if defined(TEST) || defined(INFO) || defined(PING)
- else
- {
- *logofs << "Proxy: Not sending a new ping with "
- << "elapsed in " << diffIn << " out "
- << diffOut << " ping " << diffPing
- << ".\n" << logofs_flush;
- }
- #endif
- }
- }
-
- return 1;
-}
-
-int Proxy::handleSyncFromProxy(int channelId)
-{
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: WARNING! Received a synchronization "
- << "request from the remote proxy.\n"
- << logofs_flush;
- #endif
-
- if (handleControl(code_sync_reply, channelId) < 0)
- {
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleResetStores()
-{
- //
- // Recreate the message stores.
- //
-
- delete clientStore_;
- delete serverStore_;
-
- clientStore_ = new ClientStore(compressor_);
- serverStore_ = new ServerStore(compressor_);
-
- timeouts_.loadTs = nullTimestamp();
-
- //
- // Replace message stores in channels.
- //
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL)
- {
- if (channels_[channelId] -> setStores(clientStore_, serverStore_) < 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Failed to replace message stores in "
- << "channel for FD#" << getFd(channelId) << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Failed to replace message stores in "
- << "channel for FD#" << getFd(channelId) << ".\n";
-
- return -1;
- }
- #ifdef TEST
- else
- {
- *logofs << "Proxy: Replaced message stores in channel "
- << "for FD#" << getFd(channelId) << ".\n"
- << logofs_flush;
- }
- #endif
- }
- }
-
- return 1;
-}
-
-int Proxy::handleResetPersistentCache()
-{
- char *fullName = new char[strlen(control -> PersistentCachePath) +
- strlen(control -> PersistentCacheName) + 2];
-
- strcpy(fullName, control -> PersistentCachePath);
- strcat(fullName, "/");
- strcat(fullName, control -> PersistentCacheName);
-
- #ifdef TEST
- *logofs << "Proxy: Going to remove persistent cache file '"
- << fullName << "'\n" << logofs_flush;
- #endif
-
- unlink(fullName);
-
- delete [] fullName;
-
- delete [] control -> PersistentCacheName;
-
- control -> PersistentCacheName = NULL;
-
- return 1;
-}
-
-void Proxy::handleResetFlush()
-{
- #ifdef TEST
- *logofs << "Proxy: Going to reset flush counters "
- << "for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- //
- // Reset the proxy priority flag.
- //
-
- priority_ = 0;
-
- //
- // Restore buffers to their initial
- // size.
- //
-
- transport_ -> partialReset();
-
- //
- // Update the timestamp of the last
- // write operation performed on the
- // socket.
- //
-
- timeouts_.writeTs = getTimestamp();
-}
-
-int Proxy::handleFinish()
-{
- //
- // Reset the timestamps to give the proxy
- // another chance to show the 'no response'
- // dialog if the shutdown message doesn't
- // come in time.
- //
-
- timeouts_.readTs = getTimestamp();
-
- timeouts_.alertTs = nullTimestamp();
-
- finish_ = 1;
-
- return 1;
-}
-
-int Proxy::handleShutdown()
-{
- //
- // Send shutdown message to remote proxy.
- //
-
- shutdown_ = 1;
-
- handleControl(code_shutdown_request);
-
- #ifdef TEST
- *logofs << "Proxy: Starting shutdown procedure "
- << "for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- //
- // Ensure that all the data accumulated
- // in the transport buffer is flushed
- // to the network layer.
- //
-
- for (int i = 0; i < 100; i++)
- {
- if (canFlush() == 1)
- {
- handleFlush();
- }
- else
- {
- break;
- }
-
- usleep(100000);
- }
-
- //
- // Now wait for the network layers to
- // consume all the data.
- //
-
- for (int i = 0; i < 100; i++)
- {
- if (transport_ -> queued() <= 0)
- {
- break;
- }
-
- usleep(100000);
- }
-
- //
- // Give time to the remote end to read
- // the shutdown message and close the
- // connection.
- //
-
- transport_ -> wait(10000);
-
- #ifdef TEST
- *logofs << "Proxy: Ending shutdown procedure "
- << "for proxy FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- return 1;
-}
-
-int Proxy::handleChannelConfiguration()
-{
- if (activeChannels_.getSize() == 0)
- {
- #ifdef TEST
- *logofs << "Proxy: Going to initialize the static "
- << "members in channels for proxy FD#"
- << fd_ << ".\n" << logofs_flush;
- #endif
-
- Channel::setReferences();
-
- ClientChannel::setReferences();
- ServerChannel::setReferences();
-
- GenericChannel::setReferences();
- }
-
- return 1;
-}
-
-int Proxy::handleSocketConfiguration()
-{
- //
- // Set linger mode on proxy to correctly
- // get shutdown notification.
- //
-
- SetLingerTimeout(fd_, 30);
-
- //
- // Set keep-alive on socket so that if remote link
- // terminates abnormally (as killed hard or because
- // of a power-off) process will get a SIGPIPE. In
- // practice this is useless as proxies already ping
- // each other every few seconds.
- //
-
- if (control -> OptionProxyKeepAlive == 1)
- {
- SetKeepAlive(fd_);
- }
-
- //
- // Set 'priority' flag at TCP layer for path
- // proxy-to-proxy. Look at IPTOS_LOWDELAY in
- // man 7 ip.
- //
-
- if (control -> OptionProxyLowDelay == 1)
- {
- SetLowDelay(fd_);
- }
-
- //
- // Update size of TCP send and receive buffers.
- //
-
- if (control -> OptionProxySendBuffer != -1)
- {
- SetSendBuffer(fd_, control -> OptionProxySendBuffer);
- }
-
- if (control -> OptionProxyReceiveBuffer != -1)
- {
- SetReceiveBuffer(fd_, control -> OptionProxyReceiveBuffer);
- }
-
- //
- // Update TCP_NODELAY settings. Note that on old Linux
- // kernels turning off the Nagle algorithm didn't work
- // when proxy was run through a PPP link. Trying to do
- // so caused the kernel to stop delivering data to us
- // if a serious network congestion was encountered.
- //
-
- if (control -> ProxyMode == proxy_client)
- {
- if (control -> OptionProxyClientNoDelay != -1)
- {
- SetNoDelay(fd_, control -> OptionProxyClientNoDelay);
- }
- }
- else
- {
- if (control -> OptionProxyServerNoDelay != -1)
- {
- SetNoDelay(fd_, control -> OptionProxyServerNoDelay);
- }
- }
-
- return 1;
-}
-
-int Proxy::handleLinkConfiguration()
-{
- #ifdef TEST
- *logofs << "Proxy: Propagating parameters to "
- << "channels' read buffers.\n"
- << logofs_flush;
- #endif
-
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL)
- {
- channels_[channelId] -> handleConfiguration();
- }
- }
-
- #ifdef TEST
- *logofs << "Proxy: Propagating parameters to "
- << "proxy buffers.\n"
- << logofs_flush;
- #endif
-
- readBuffer_.setSize(control -> ProxyInitialReadSize,
- control -> ProxyMaximumBufferSize);
-
- encodeBuffer_.setSize(control -> TransportProxyBufferSize,
- control -> TransportProxyBufferThreshold,
- control -> TransportMaximumBufferSize);
-
- transport_ -> setSize(control -> TransportProxyBufferSize,
- control -> TransportProxyBufferThreshold,
- control -> TransportMaximumBufferSize);
-
- #ifdef TEST
- *logofs << "Proxy: Configuring the proxy timeouts.\n"
- << logofs_flush;
- #endif
-
- timeouts_.split = control -> SplitTimeout;
- timeouts_.motion = control -> MotionTimeout;
-
- #ifdef TEST
- *logofs << "Proxy: Configuring the proxy tokens.\n"
- << logofs_flush;
- #endif
-
- tokens_[token_control].size = control -> TokenSize;
- tokens_[token_control].limit = control -> TokenLimit;
-
- if (tokens_[token_control].limit < 1)
- {
- tokens_[token_control].limit = 1;
- }
-
- #if defined(TEST) || defined(INFO) || defined(LIMIT)
- *logofs << "Proxy: TOKEN! LIMIT! Setting token ["
- << DumpToken(token_control) << "] size to "
- << tokens_[token_control].size << " and limit to "
- << tokens_[token_control].limit << ".\n"
- << logofs_flush;
- #endif
-
- tokens_[token_split].size = control -> TokenSize;
- tokens_[token_split].limit = control -> TokenLimit / 2;
-
- if (tokens_[token_split].limit < 1)
- {
- tokens_[token_split].limit = 1;
- }
-
- #if defined(TEST) || defined(INFO) || defined(LIMIT)
- *logofs << "Proxy: TOKEN! LIMIT! Setting token ["
- << DumpToken(token_split) << "] size to "
- << tokens_[token_split].size << " and limit to "
- << tokens_[token_split].limit << ".\n"
- << logofs_flush;
- #endif
-
- tokens_[token_data].size = control -> TokenSize;
- tokens_[token_data].limit = control -> TokenLimit / 4;
-
- if (tokens_[token_data].limit < 1)
- {
- tokens_[token_data].limit = 1;
- }
-
- #if defined(TEST) || defined(INFO) || defined(LIMIT)
- *logofs << "Proxy: TOKEN! LIMIT! Setting token ["
- << DumpToken(token_data) << "] size to "
- << tokens_[token_data].size << " and limit to "
- << tokens_[token_data].limit << ".\n"
- << logofs_flush;
- #endif
-
- for (int i = token_control; i <= token_data; i++)
- {
- tokens_[i].remaining = tokens_[i].limit;
- }
-
- #if defined(TEST) || defined(INFO) || defined(LIMIT)
- *logofs << "Proxy: LIMIT! Using client bitrate "
- << "limit " << control -> ClientBitrateLimit
- << " server bitrate limit " << control ->
- ServerBitrateLimit << " with local limit "
- << control -> LocalBitrateLimit << ".\n"
- << logofs_flush;
- #endif
-
- //
- // Set the other parameters based on
- // the token size.
- //
-
- int base = control -> TokenSize;
-
- control -> SplitDataThreshold = base * 4;
- control -> SplitDataPacketLimit = base / 2;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: LIMIT! Setting split data threshold "
- << "to " << control -> SplitDataThreshold
- << " split packet limit to " << control ->
- SplitDataPacketLimit << " with base "
- << base << ".\n" << logofs_flush;
- #endif
-
- //
- // Set the number of bytes read from the
- // data channels at each loop. This will
- // basically determine the maximum band-
- // width available for the generic chan-
- // nels.
- //
-
- control -> GenericInitialReadSize = base / 2;
- control -> GenericMaximumBufferSize = base / 2;
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: LIMIT! Setting generic channel "
- << "initial read size to " << control ->
- GenericInitialReadSize << " maximum read "
- << "size to " << control -> GenericMaximumBufferSize
- << " with base " << base << ".\n"
- << logofs_flush;
- #endif
-
- return 1;
-}
-
-int Proxy::handleCacheConfiguration()
-{
- #ifdef TEST
- *logofs << "Proxy: Configuring cache according to pack parameters.\n"
- << logofs_flush;
- #endif
-
- //
- // Further adjust the cache parameters. If
- // packing of the images is enabled, reduce
- // the size available for plain images.
- //
-
- if (control -> SessionMode == session_agent)
- {
- if (control -> PackMethod != NO_PACK)
- {
- clientStore_ -> getRequestStore(X_PutImage) ->
- cacheThreshold = PUTIMAGE_CACHE_THRESHOLD_IF_PACKED;
-
- clientStore_ -> getRequestStore(X_PutImage) ->
- cacheLowerThreshold = PUTIMAGE_CACHE_LOWER_THRESHOLD_IF_PACKED;
- }
- }
-
- //
- // If this is a shadow session increase the
- // size of the image cache.
- //
-
- if (control -> SessionMode == session_shadow)
- {
- if (control -> PackMethod != NO_PACK)
- {
- clientStore_ -> getRequestStore(X_NXPutPackedImage) ->
- cacheThreshold = PUTPACKEDIMAGE_CACHE_THRESHOLD_IF_PACKED_SHADOW;
-
- clientStore_ -> getRequestStore(X_NXPutPackedImage) ->
- cacheLowerThreshold = PUTPACKEDIMAGE_CACHE_LOWER_THRESHOLD_IF_PACKED_SHADOW;
- }
- else
- {
- clientStore_ -> getRequestStore(X_PutImage) ->
- cacheThreshold = PUTIMAGE_CACHE_THRESHOLD_IF_SHADOW;
-
- clientStore_ -> getRequestStore(X_PutImage) ->
- cacheLowerThreshold = PUTIMAGE_CACHE_LOWER_THRESHOLD_IF_SHADOW;
- }
- }
-
- return 1;
-}
-
-int Proxy::handleSaveStores()
-{
- //
- // Save content of stores on disk.
- //
-
- char *cacheToAdopt = NULL;
-
- //
- // Set to false the indicator for cumulative store
- // size too small
- //
- bool isTooSmall = false;
-
- if (control -> PersistentCacheEnableSave)
- {
- #ifdef TEST
- *logofs << "Proxy: Going to save content of client store.\n"
- << logofs_flush;
- #endif
-
- cacheToAdopt = handleSaveAllStores(control -> PersistentCachePath, isTooSmall);
- }
- #ifdef TEST
- else
- {
- if (control -> ProxyMode == proxy_client)
- {
- *logofs << "Proxy: Saving persistent cache to disk disabled.\n"
- << logofs_flush;
- }
- else
- {
- *logofs << "Proxy: PANIC! Protocol violation in command save.\n"
- << logofs_flush;
-
- cerr << "Error" << ": Protocol violation in command save.\n";
-
- HandleCleanup();
- }
- }
- #endif
-
- if (cacheToAdopt != NULL)
- {
- //
- // Do we have a cache already?
- //
-
- if (control -> PersistentCacheName != NULL)
- {
- //
- // Check if old and new cache are the same.
- // In this case don't remove the old cache.
- //
-
- if (strcasecmp(control -> PersistentCacheName, cacheToAdopt) != 0)
- {
- handleResetPersistentCache();
- }
-
- delete [] control -> PersistentCacheName;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Setting current persistent cache file to '"
- << cacheToAdopt << "'\n" << logofs_flush;
- #endif
-
- control -> PersistentCacheName = cacheToAdopt;
-
- return 1;
- }
- else
- {
- #ifdef TEST
- *logofs << "Proxy: No cache file produced from message stores.\n"
- << logofs_flush;
- #endif
-
- //
- // It can be that we didn't generate a new cache
- // because store was too small or persistent cache
- // was disabled. This is not an error.
- //
-
- if (control -> PersistentCacheEnableSave && !isTooSmall)
- {
- return -1;
- }
- else
- {
- return 0;
- }
- }
-}
-
-int Proxy::handleLoadStores()
-{
- //
- // Restore the content of the client store
- // from disk if a valid cache was negotiated
- // at session startup.
- //
-
- if (control -> PersistentCacheEnableLoad == 1 &&
- control -> PersistentCachePath != NULL &&
- control -> PersistentCacheName != NULL)
- {
- #ifdef TEST
- *logofs << "Proxy: Going to load content of client store.\n"
- << logofs_flush;
- #endif
-
- //
- // Returns the same string passed as name of
- // the cache, or NULL if it was not possible
- // to load the cache from disk.
- //
-
- if (handleLoadAllStores(control -> PersistentCachePath,
- control -> PersistentCacheName) == NULL)
- {
- //
- // The corrupted cache should have been
- // removed from disk. Get rid of the
- // reference so we don't try to delete
- // it once again.
- //
-
- if (control -> PersistentCacheName != NULL)
- {
- delete [] control -> PersistentCacheName;
- }
-
- control -> PersistentCacheName = NULL;
-
- return -1;
- }
-
- //
- // Set timestamp of last time cache
- // was loaded from data on disk.
- //
-
- timeouts_.loadTs = getTimestamp();
-
- return 1;
- }
- #ifdef TEST
- else
- {
- if (control -> ProxyMode == proxy_client)
- {
- *logofs << "Proxy: Loading of cache disabled or no cache file selected.\n"
- << logofs_flush;
- }
- else
- {
- *logofs << "Proxy: PANIC! Protocol violation in command load.\n"
- << logofs_flush;
-
- cerr << "Error" << ": Protocol violation in command load.\n";
-
- HandleCleanup();
- }
- }
- #endif
-
- return 0;
-}
-
-int Proxy::handleControl(T_proxy_code code, int data)
-{
- //
- // Send the given control messages
- // to the remote proxy.
- //
-
- #if defined(TEST) || defined(INFO)
-
- if (data != -1)
- {
- if (code == code_control_token_reply ||
- code == code_split_token_reply ||
- code == code_data_token_reply)
- {
- *logofs << "Proxy: TOKEN! Sending message '" << DumpControl(code)
- << "' at " << strMsTimestamp() << " with count "
- << data << ".\n" << logofs_flush;
- }
- else
- {
- *logofs << "Proxy: Sending message '" << DumpControl(code)
- << "' at " << strMsTimestamp() << " with data ID#"
- << data << ".\n" << logofs_flush;
- }
- }
- else
- {
- *logofs << "Proxy: Sending message '" << DumpControl(code)
- << "' at " << strMsTimestamp() << ".\n"
- << logofs_flush;
- }
-
- #endif
-
- //
- // Add the control message and see if the
- // data has to be flushed immediately.
- //
-
- if (addControlCodes(code, data) < 0)
- {
- return -1;
- }
-
- switch (code)
- {
- //
- // Append the first data read from the opened
- // channel to the control code.
- //
-
- case code_new_x_connection:
- case code_new_cups_connection:
- case code_new_aux_connection:
- case code_new_smb_connection:
- case code_new_media_connection:
- case code_new_http_connection:
- case code_new_font_connection:
- case code_new_slave_connection:
-
- //
- // Do we send the token reply immediately?
- // The control messages are put at the begin-
- // ning of the of the encode buffer, so we may
- // reply to multiple tokens before having the
- // chance of handling the actual frame data.
- // On the other hand, the sooner we reply, the
- // sooner the remote proxy is restarted.
- //
-
- case code_control_token_reply:
- case code_split_token_reply:
- case code_data_token_reply:
- {
- break;
- }
-
- //
- // Also send the congestion control codes
- // immediately.
- //
- // case code_begin_congestion:
- // case code_end_congestion:
- //
-
- default:
- {
- priority_ = 1;
-
- break;
- }
- }
-
- if (priority_ == 1)
- {
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- return 1;
-}
-
-int Proxy::handleSwitch(int channelId)
-{
- //
- // If data is for a different channel than last
- // selected for output, prepend to the data the
- // new channel id.
- //
-
- #ifdef DEBUG
- *logofs << "Proxy: Requested a switch with "
- << "current channel ID#" << outputChannel_
- << " new channel ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- if (channelId != outputChannel_)
- {
- if (needFlush() == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(FLUSH)
- *logofs << "Proxy: WARNING! Flushing data for the "
- << "previous channel ID#" << outputChannel_
- << ".\n" << logofs_flush;
- #endif
-
- if (handleFrame(frame_data) < 0)
- {
- return -1;
- }
- }
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Sending message '"
- << DumpControl(code_switch_connection) << "' at "
- << strMsTimestamp() << " with FD#" << getFd(channelId)
- << " channel ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- if (addControlCodes(code_switch_connection, channelId) < 0)
- {
- return -1;
- }
-
- outputChannel_ = channelId;
- }
-
- return 1;
-}
-
-int Proxy::addTokenCodes(T_proxy_token &token)
-{
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
- *logofs << "Proxy: TOKEN! Sending token ["
- << DumpToken(token.type) << "] with "
- << token.bytes << " bytes accumulated size "
- << token.size << " and " << token.remaining
- << " available.\n" << logofs_flush;
- #endif
-
- //
- // Give a 'weight' to the token. The tokens
- // remaining can become negative if we sent
- // a packet that was exceptionally big.
- //
-
- int count = 0;
-
- // Since ProtoStep7 (#issue 108)
- count = token.bytes / token.size;
-
- //
- // Force a count of 1, for example
- // if this is a ping.
- //
-
- if (count < 1)
- {
- count = 1;
-
- token.bytes = 0;
- }
- else
- {
- // Since ProtoStep7 (#issue 108)
- if (count > 255)
- {
- count = 255;
- }
-
- //
- // Let the next token account for the
- // remaining bytes.
- //
-
- token.bytes %= token.size;
- }
-
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
- *logofs << "Proxy: Sending message '"
- << DumpControl(token.request) << "' at "
- << strMsTimestamp() << " with count " << count
- << ".\n" << logofs_flush;
- #endif
-
- controlCodes_[controlLength_++] = 0;
- controlCodes_[controlLength_++] = (unsigned char) token.request;
- controlCodes_[controlLength_++] = (unsigned char) count;
-
- statistics -> addFrameOut();
-
- token.remaining -= count;
-
- return 1;
-}
-
-int Proxy::handleToken(T_frame_type type)
-{
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
- *logofs << "Proxy: TOKEN! Checking tokens with "
- << "frame type [";
-
- *logofs << (type == frame_ping ? "frame_ping" : "frame_data");
-
- *logofs << "] with stream ratio " << statistics ->
- getStreamRatio() << ".\n" << logofs_flush;
- #endif
-
- if (type == frame_data)
- {
- //
- // Since ProtoStep7 (#issue 108)
- //
-
- // Send a distinct token for each data type.
- // We don't want to slow down the sending of
- // the X events, X replies and split confir-
- // mation events on the X server side, so
- // take care only of the generic data token.
- //
-
- if (control -> ProxyMode == proxy_client)
- {
- statistics -> updateControlToken(tokens_[token_control].bytes);
-
- if (tokens_[token_control].bytes > tokens_[token_control].size)
- {
- if (addTokenCodes(tokens_[token_control]) < 0)
- {
- return -1;
- }
-
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
-
- T_proxy_token &token = tokens_[token_control];
-
- *logofs << "Proxy: TOKEN! Token class ["
- << DumpToken(token.type) << "] has now "
- << token.bytes << " bytes accumulated and "
- << token.remaining << " tokens remaining.\n"
- << logofs_flush;
- #endif
- }
-
- statistics -> updateSplitToken(tokens_[token_split].bytes);
-
- if (tokens_[token_split].bytes > tokens_[token_split].size)
- {
- if (addTokenCodes(tokens_[token_split]) < 0)
- {
- return -1;
- }
-
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
-
- T_proxy_token &token = tokens_[token_split];
-
- *logofs << "Proxy: TOKEN! Token class ["
- << DumpToken(token.type) << "] has now "
- << token.bytes << " bytes accumulated and "
- << token.remaining << " tokens remaining.\n"
- << logofs_flush;
- #endif
- }
- }
-
- statistics -> updateDataToken(tokens_[token_data].bytes);
-
- if (tokens_[token_data].bytes > tokens_[token_data].size)
- {
- if (addTokenCodes(tokens_[token_data]) < 0)
- {
- return -1;
- }
-
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
-
- T_proxy_token &token = tokens_[token_data];
-
- *logofs << "Proxy: TOKEN! Token class ["
- << DumpToken(token.type) << "] has now "
- << token.bytes << " bytes accumulated and "
- << token.remaining << " tokens remaining.\n"
- << logofs_flush;
- #endif
- }
- }
- else
- {
- if (addTokenCodes(tokens_[token_control]) < 0)
- {
- return -1;
- }
-
- //
- // Reset all counters on a ping.
- //
-
- tokens_[token_control].bytes = 0;
- tokens_[token_split].bytes = 0;
- tokens_[token_data].bytes = 0;
-
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
-
- T_proxy_token &token = tokens_[token_control];
-
- *logofs << "Proxy: TOKEN! Token class ["
- << DumpToken(token.type) << "] has now "
- << token.bytes << " bytes accumulated and "
- << token.remaining << " tokens remaining.\n"
- << logofs_flush;
- #endif
- }
-
- //
- // Check if we have entered in
- // congestion state.
- //
-
- if (congestion_ == 0 &&
- tokens_[token_control].remaining <= 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Entering congestion with "
- << tokens_[token_control].remaining
- << " tokens remaining.\n" << logofs_flush;
- #endif
-
- congestion_ = 1;
- }
-
- statistics -> updateCongestion(tokens_[token_control].remaining,
- tokens_[token_control].limit);
-
- return 1;
-}
-
-int Proxy::handleTokenFromProxy(T_proxy_token &token, int count)
-{
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
- *logofs << "Proxy: TOKEN! Received token ["
- << DumpToken(token.type) << "] request at "
- << strMsTimestamp() << " with count "
- << count << ".\n" << logofs_flush;
- #endif
-
- //
- // Since ProtoStep7 (#issue 108) with no limitations
- // concerning invalid token requests at this point
- //
-
- //
- // Add our token reply.
- //
-
- if (handleControl(token.reply, count) < 0)
- {
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::handleTokenReplyFromProxy(T_proxy_token &token, int count)
-{
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
- *logofs << "Proxy: TOKEN! Received token ["
- << DumpToken(token.type) << "] reply at "
- << strMsTimestamp() << " with count " << count
- << ".\n" << logofs_flush;
- #endif
-
- //
- // Since ProtoStep7 (#issue 108) with no limitations
- // concerning invalid token requests at this point
- //
-
- //
- // Increment the available tokens.
- //
-
- token.remaining += count;
-
- if (token.remaining > token.limit)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Token overflow handling messages.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Token overflow handling messages.\n";
-
- HandleCleanup();
- }
-
- #if defined(TEST) || defined(INFO) || defined(TOKEN)
- *logofs << "Proxy: TOKEN! Token class ["
- << DumpToken(token.type) << "] has now " << token.bytes
- << " bytes accumulated and " << token.remaining
- << " tokens remaining.\n" << logofs_flush;
- #endif
-
- //
- // Check if we can jump out of the
- // congestion state.
- //
-
- if (congestion_ == 1 &&
- tokens_[token_control].remaining > 0)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: Exiting congestion with "
- << tokens_[token_control].remaining
- << " tokens remaining.\n" << logofs_flush;
- #endif
-
- congestion_ = 0;
- }
-
- statistics -> updateCongestion(tokens_[token_control].remaining,
- tokens_[token_control].limit);
-
- return 1;
-}
-
-void Proxy::handleFailOnSave(const char *fullName, const char *failContext) const
-{
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Error saving stores to cache file "
- << "in context [" << failContext << "].\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Error saving stores to cache file "
- << "in context [" << failContext << "].\n";
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Removing invalid cache '"
- << fullName << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Removing invalid cache '"
- << fullName << "'.\n";
-
- unlink(fullName);
-}
-
-void Proxy::handleFailOnLoad(const char *fullName, const char *failContext) const
-{
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Error loading stores from cache file "
- << "in context [" << failContext << "].\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Error loading stores from cache file "
- << "in context [" << failContext << "].\n";
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Removing invalid cache '"
- << fullName << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Removing invalid cache '"
- << fullName << "'.\n";
-
- unlink(fullName);
-}
-
-int Proxy::handleSaveVersion(unsigned char *buffer, int &major,
- int &minor, int &patch) const
-{
- // Since ProtoStep8 (#issue 108)
- major = 3;
- minor = 0;
- patch = 0;
-
- *(buffer + 0) = major;
- *(buffer + 1) = minor;
-
- PutUINT(patch, buffer + 2, storeBigEndian());
-
- return 1;
-}
-
-int Proxy::handleLoadVersion(const unsigned char *buffer, int &major,
- int &minor, int &patch) const
-{
- major = *(buffer + 0);
- minor = *(buffer + 1);
-
- patch = GetUINT(buffer + 2, storeBigEndian());
-
- //
- // Force the proxy to discard the
- // incompatible caches.
- //
-
- // Since ProtoStep8 (#issue 108)
- if (major < 3)
- {
- return -1;
- }
-
- return 1;
-}
-
-char *Proxy::handleSaveAllStores(const char *savePath, bool & isTooSmall) const
-{
- isTooSmall = false;
-
- int cumulativeSize = MessageStore::getCumulativeTotalStorageSize();
-
- if (cumulativeSize < control -> PersistentCacheThreshold)
- {
- #ifdef TEST
- *logofs << "Proxy: Cache not saved as size is "
- << cumulativeSize << " with threshold set to "
- << control -> PersistentCacheThreshold
- << ".\n" << logofs_flush;
- #endif
-
- //
- // Cumulative store size is smaller than threshold
- // so the indicator is set to true
- //
-
- isTooSmall = true;
-
- return NULL;
- }
- else if (savePath == NULL)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! No name provided for save path.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": No name provided for save path.\n";
-
- return NULL;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Going to save content of message stores.\n"
- << logofs_flush;
- #endif
-
- //
- // Our parent process is likely going to terminate.
- // Until we finish saving cache we must ignore its
- // SIGIPE.
- //
-
- DisableSignals();
-
- ofstream *cachefs = NULL;
-
- md5_state_t *md5StateStream = NULL;
- md5_byte_t *md5DigestStream = NULL;
-
- md5_state_t *md5StateClient = NULL;
- md5_byte_t *md5DigestClient = NULL;
-
- char md5String[MD5_LENGTH * 2 + 2];
-
- char fullName[strlen(savePath) + MD5_LENGTH * 2 + 4];
-
- //
- // Prepare the template for the temporary file
- //
-
- const char* const uniqueTemplate = "XXXXXX";
- char tempName[strlen(savePath) + strlen("/") + 4 + strlen(uniqueTemplate) + 1];
-
- snprintf(tempName, sizeof tempName, "%s/%s%s",
- savePath,
- control -> ProxyMode == proxy_client ?
- "Z-C-" :
- "Z-S-",
- uniqueTemplate);
-
- #ifdef TEST
- *logofs << "Proxy: Generating temporary file with template '"
- << tempName << "'.\n" << logofs_flush;
- #endif
-
- //
- // Change the mask to make the file only
- // readable by the user, then restore the
- // old mask.
- //
-
- mode_t fileMode = umask(0077);
-
- //
- // Generate a unique temporary filename from tempName
- // and then create and open the file
- //
-
- int fdTemp = mkstemp(tempName);
- if (fdTemp == -1)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't create temporary file in '"
- << savePath << "'. Cause = " << strerror(errno) << ".\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Can't create temporary file in '"
- << savePath << "'. Cause = " << strerror(errno) << ".\n";
-
- umask(fileMode);
-
- EnableSignals();
-
- return NULL;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Saving cache to file '"
- << tempName << "'.\n" << logofs_flush;
- #endif
-
- //
- // Create and open the output stream for the new temporary
- // file
- //
-
- cachefs = new (std::nothrow) ofstream(tempName, ios::out | ios::binary);
- if ((cachefs == NULL) || cachefs->fail())
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't create stream for temporary file '"
- << tempName << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Can't create stream for temporary file '"
- << tempName << "'.\n";
-
- close(fdTemp);
- unlink(tempName);
-
- umask(fileMode);
-
- EnableSignals();
-
- return NULL;
- }
-
- //
- // Close the file descriptor returned by mkstemp
- // and restore the old mask
- //
-
- close(fdTemp);
- umask(fileMode);
-
- md5StateStream = new md5_state_t();
- md5DigestStream = new md5_byte_t[MD5_LENGTH];
-
- md5_init(md5StateStream);
-
- //
- // First write the proxy version.
- //
-
- unsigned char version[4];
-
- int major;
- int minor;
- int patch;
-
- handleSaveVersion(version, major, minor, patch);
-
- #ifdef TEST
- *logofs << "Proxy: Saving cache using version '"
- << major << "." << minor << "." << patch
- << "'.\n" << logofs_flush;
- #endif
-
- if (PutData(cachefs, version, 4) < 0)
- {
- handleFailOnSave(tempName, "A");
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- EnableSignals();
-
- return NULL;
- }
-
- //
- // Make space for the calculated MD5 so we
- // can later rewind the file and write it
- // at this position.
- //
-
- if (PutData(cachefs, md5DigestStream, MD5_LENGTH) < 0)
- {
- handleFailOnSave(tempName, "B");
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- EnableSignals();
-
- return NULL;
- }
-
- md5StateClient = new md5_state_t();
- md5DigestClient = new md5_byte_t[MD5_LENGTH];
-
- md5_init(md5StateClient);
-
- #ifdef DUMP
-
- ofstream *cacheDump = NULL;
-
- ofstream *tempfs = (ofstream*) logofs;
-
- char cacheDumpName[DEFAULT_STRING_LENGTH];
-
- if (control -> ProxyMode == proxy_client)
- {
- snprintf(cacheDumpName, DEFAULT_STRING_LENGTH - 1,
- "%s/client-cache-dump", control -> TempPath);
- }
- else
- {
- snprintf(cacheDumpName, DEFAULT_STRING_LENGTH - 1,
- "%s/server-cache-dump", control -> TempPath);
- }
-
- *(cacheDumpName + DEFAULT_STRING_LENGTH - 1) = '\0';
-
- fileMode = umask(0077);
-
- cacheDump = new ofstream(cacheDumpName, ios::out);
-
- umask(fileMode);
-
- logofs = cacheDump;
-
- #endif
-
- //
- // Use the virtual method of the concrete proxy class.
- //
-
- int allSaved = handleSaveAllStores(cachefs, md5StateStream, md5StateClient);
-
- #ifdef DUMP
-
- logofs = tempfs;
-
- delete cacheDump;
-
- #endif
-
- if (allSaved == -1)
- {
- handleFailOnSave(tempName, "C");
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- delete md5StateClient;
- delete [] md5DigestClient;
-
- EnableSignals();
-
- return NULL;
- }
-
- md5_finish(md5StateClient, md5DigestClient);
-
- for (unsigned int i = 0; i < MD5_LENGTH; i++)
- {
- sprintf(md5String + (i * 2), "%02X", md5DigestClient[i]);
- }
-
- strcpy(fullName, (control -> ProxyMode == proxy_client) ? "C-" : "S-");
-
- strcat(fullName, md5String);
-
- md5_append(md5StateStream, (const md5_byte_t *) fullName, strlen(fullName));
- md5_finish(md5StateStream, md5DigestStream);
-
- //
- // Go to the beginning of file plus
- // the integer where we wrote our
- // proxy version.
- //
-
- cachefs -> seekp(4);
-
- if (PutData(cachefs, md5DigestStream, MD5_LENGTH) < 0)
- {
- handleFailOnSave(tempName, "D");
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- delete md5StateClient;
- delete [] md5DigestClient;
-
- EnableSignals();
-
- return NULL;
- }
-
- delete cachefs;
-
- //
- // Copy the resulting cache name without
- // the path so that we can return it to
- // the caller.
- //
-
- char *cacheName = new char[MD5_LENGTH * 2 + 4];
-
- strcpy(cacheName, fullName);
-
- //
- // Add the path to the full name and move
- // the cache into the path.
- //
-
- strcpy(fullName, savePath);
- strcat(fullName, (control -> ProxyMode == proxy_client) ? "/C-" : "/S-");
- strcat(fullName, md5String);
-
- #ifdef TEST
- *logofs << "Proxy: Renaming cache file from '"
- << tempName << "' to '" << fullName
- << "'.\n" << logofs_flush;
- #endif
-
- rename(tempName, fullName);
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- delete md5StateClient;
- delete [] md5DigestClient;
-
- //
- // Restore the original handlers.
- //
-
- EnableSignals();
-
- #ifdef TEST
- *logofs << "Proxy: Successfully saved cache file '"
- << cacheName << "'.\n" << logofs_flush;
- #endif
-
- //
- // This must be enabled only for test
- // because it requires that client and
- // server reside on the same machine.
- //
-
- if (control -> PersistentCacheCheckOnShutdown == 1 &&
- control -> ProxyMode == proxy_server)
- {
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: MATCH! Checking if the file '"
- << fullName << "' matches a client cache.\n"
- << logofs_flush;
- #endif
-
- strcpy(fullName, savePath);
- strcat(fullName, "/C-");
- strcat(fullName, md5String);
-
- struct stat fileStat;
-
- if (stat(fullName, &fileStat) != 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't find a client cache "
- << "with name '" << fullName << "'.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Can't find a client cache "
- << "with name '" << fullName << "'.\n";
-
- HandleShutdown();
- }
-
- #if defined(TEST) || defined(INFO)
- *logofs << "Proxy: MATCH! Client cache '" << fullName
- << "' matches the local cache.\n"
- << logofs_flush;
- #endif
- }
-
- return cacheName;
-}
-
-const char *Proxy::handleLoadAllStores(const char *loadPath, const char *loadName) const
-{
- #ifdef TEST
- *logofs << "Proxy: Going to load content of message stores.\n"
- << logofs_flush;
- #endif
-
- //
- // Until we finish loading cache we
- // must at least ignore any SIGIPE.
- //
-
- DisableSignals();
-
- if (loadPath == NULL || loadName == NULL)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! No path or no file name provided for cache to restore.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": No path or no file name provided for cache to restore.\n";
-
- EnableSignals();
-
- return NULL;
- }
- else if (strlen(loadName) != MD5_LENGTH * 2 + 2)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Bad file name provided for cache to restore.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Bad file name provided for cache to restore.\n";
-
- EnableSignals();
-
- return NULL;
- }
-
- istream *cachefs = NULL;
- char md5String[(MD5_LENGTH * 2) + 2];
- md5_byte_t md5FromFile[MD5_LENGTH];
-
- char *cacheName = new char[strlen(loadPath) + strlen(loadName) + 3];
-
- strcpy(cacheName, loadPath);
- strcat(cacheName, "/");
- strcat(cacheName, loadName);
-
- #ifdef TEST
- *logofs << "Proxy: Name of cache file is '"
- << cacheName << "'.\n" << logofs_flush;
- #endif
-
- cachefs = new ifstream(cacheName, ios::in | ios::binary);
-
- unsigned char version[4];
-
- if (cachefs == NULL || GetData(cachefs, version, 4) < 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't read cache file '"
- << cacheName << "'.\n" << logofs_flush;;
- #endif
-
- handleFailOnLoad(cacheName, "A");
-
- delete cachefs;
-
- delete [] cacheName;
-
- EnableSignals();
-
- return NULL;
- }
-
- int major;
- int minor;
- int patch;
-
- if (handleLoadVersion(version, major, minor, patch) < 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: WARNING! Incompatible version '"
- << major << "." << minor << "." << patch
- << "' in cache file '" << cacheName
- << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Incompatible version '"
- << major << "." << minor << "." << patch
- << "' in cache file '" << cacheName
- << "'.\n" << logofs_flush;
-
- if (control -> ProxyMode == proxy_server)
- {
- handleFailOnLoad(cacheName, "B");
- }
- else
- {
- //
- // Simply remove the cache file.
- //
-
- unlink(cacheName);
- }
-
- delete cachefs;
-
- delete [] cacheName;
-
- EnableSignals();
-
- return NULL;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Reading from cache file version '"
- << major << "." << minor << "." << patch
- << "'.\n" << logofs_flush;
- #endif
-
- if (GetData(cachefs, md5FromFile, MD5_LENGTH) < 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! No checksum in cache file '"
- << loadName << "'.\n" << logofs_flush;
- #endif
-
- handleFailOnLoad(cacheName, "C");
-
- delete cachefs;
-
- delete [] cacheName;
-
- EnableSignals();
-
- return NULL;
- }
-
- md5_state_t *md5StateStream = NULL;
- md5_byte_t *md5DigestStream = NULL;
-
- md5StateStream = new md5_state_t();
- md5DigestStream = new md5_byte_t[MD5_LENGTH];
-
- md5_init(md5StateStream);
-
- //
- // Use the virtual method of the proxy class.
- //
-
- if (handleLoadAllStores(cachefs, md5StateStream) < 0)
- {
- handleFailOnLoad(cacheName, "D");
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- delete [] cacheName;
-
- EnableSignals();
-
- return NULL;
- }
-
- md5_append(md5StateStream, (const md5_byte_t *) loadName, strlen(loadName));
- md5_finish(md5StateStream, md5DigestStream);
-
- for (int i = 0; i < MD5_LENGTH; i++)
- {
- if (md5DigestStream[i] != md5FromFile[i])
- {
- #ifdef PANIC
-
- *logofs << "Proxy: PANIC! Bad checksum for cache file '"
- << cacheName << "'.\n" << logofs_flush;
-
- for (unsigned int i = 0; i < MD5_LENGTH; i++)
- {
- sprintf(md5String + (i * 2), "%02X", md5FromFile[i]);
- }
-
- *logofs << "Proxy: PANIC! Saved checksum is '"
- << md5String << "'.\n" << logofs_flush;
-
- for (unsigned int i = 0; i < MD5_LENGTH; i++)
- {
- sprintf(md5String + (i * 2),"%02X", md5DigestStream[i]);
- }
-
- *logofs << "Proxy: PANIC! Calculated checksum is '"
- << md5String << "'.\n" << logofs_flush;
-
- #endif
-
- handleFailOnLoad(cacheName, "E");
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- delete [] cacheName;
-
- EnableSignals();
-
- return NULL;
- }
- }
-
- delete cachefs;
-
- delete md5StateStream;
- delete [] md5DigestStream;
-
- delete [] cacheName;
-
- //
- // Restore the original handlers.
- //
-
- EnableSignals();
-
- #ifdef TEST
- *logofs << "Proxy: Successfully loaded cache file '"
- << loadName << "'.\n" << logofs_flush;
- #endif
-
- //
- // Return the string provided by caller.
- //
-
- return loadName;
-}
-
-int Proxy::allocateChannelMap(int fd)
-{
- //
- // We assume that the fd is lower than
- // the maximum allowed number. This is
- // checked at the time the connection
- // is accepted.
- //
-
- if (fd < 0 || fd >= CONNECTIONS_LIMIT)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Internal error allocating "
- << "new channel with FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Internal error allocating "
- << "new channel with FD#" << fd_ << ".\n";
-
- HandleCleanup();
- }
-
- for (int channelId = 0;
- channelId < CONNECTIONS_LIMIT;
- channelId++)
- {
- if (checkLocalChannelMap(channelId) == 1 &&
- fdMap_[channelId] == -1)
- {
- fdMap_[channelId] = fd;
- channelMap_[fd] = channelId;
-
- #ifdef TEST
- *logofs << "Proxy: Allocated new channel ID#"
- << channelId << " with FD#" << fd << ".\n"
- << logofs_flush;
- #endif
-
- return channelId;
- }
- }
-
- //
- // No available channel is remaining.
- //
-
- #ifdef TEST
- *logofs << "Proxy: WARNING! Can't allocate a new channel "
- << "for FD#" << fd_ << ".\n" << logofs_flush;
- #endif
-
- return -1;
-}
-
-int Proxy::checkChannelMap(int channelId)
-{
- //
- // To be acceptable, the channel id must
- // be an id that is not possible to use
- // to allocate channels at this side.
- //
-
- if (checkLocalChannelMap(channelId) == 1)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't open a new channel "
- << "with invalid ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Can't open a new channel "
- << "with invalid ID#" << channelId << ".\n";
-
- return -1;
- }
- else if (channels_[channelId] != NULL)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't open a new channel "
- << "over an existing ID#" << channelId
- << " with FD#" << getFd(channelId)
- << ".\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Can't open a new channel "
- << "over an existing ID#" << channelId
- << " with FD#" << getFd(channelId)
- << ".\n";
-
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::assignChannelMap(int channelId, int fd)
-{
- //
- // We assume that the fd is lower than
- // the maximum allowed number. This is
- // checked at the time the connection
- // is accepted.
- //
-
- if (channelId < 0 || channelId >= CONNECTIONS_LIMIT ||
- fd < 0 || fd >= CONNECTIONS_LIMIT)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Internal error assigning "
- << "new channel with FD#" << fd_ << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Internal error assigning "
- << "new channel with FD#" << fd_ << ".\n";
-
- HandleCleanup();
- }
-
- fdMap_[channelId] = fd;
- channelMap_[fd] = channelId;
-
- return 1;
-}
-
-void Proxy::cleanupChannelMap(int channelId)
-{
- int fd = fdMap_[channelId];
-
- if (fd != -1)
- {
- fdMap_[channelId] = -1;
- channelMap_[fd] = -1;
- }
-}
-
-int Proxy::addControlCodes(T_proxy_code code, int data)
-{
- //
- // Flush the encode buffer plus all the outstanding
- // control codes if there is not enough space for
- // the new control message. We need to ensure that
- // there are further bytes available, in the case
- // we will need to add more token control messages.
- //
-
- if (controlLength_ + 3 > CONTROL_CODES_THRESHOLD)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Flushing control messages "
- << "while sending code '" << DumpControl(code)
- << "'.\n" << logofs_flush;
- #endif
-
- if (handleFlush() < 0)
- {
- return -1;
- }
- }
-
- controlCodes_[controlLength_++] = 0;
- controlCodes_[controlLength_++] = (unsigned char) code;
- controlCodes_[controlLength_++] = (unsigned char) (data == -1 ? 0 : data);
-
- //
- // Account for the control frame.
- //
-
- statistics -> addFrameOut();
-
- return 1;
-}
-
-void Proxy::setSplitTimeout(int channelId)
-{
- int needed = channels_[channelId] -> needSplit();
-
- if (needed != isTimestamp(timeouts_.splitTs))
- {
- if (needed == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
- *logofs << "Proxy: SPLIT! Allocating split timestamp at "
- << strMsTimestamp() << ".\n" << logofs_flush;
- #endif
-
- timeouts_.splitTs = getTimestamp();
- }
- else
- {
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL &&
- channels_[channelId] -> needSplit() == 1)
- {
- #ifdef TEST
- *logofs << "Proxy: SPLIT! Channel for FD#"
- << getFd(channelId) << " still needs splits.\n"
- << logofs_flush;
- #endif
-
- return;
- }
- }
-
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
- *logofs << "Proxy: SPLIT! Resetting split timestamp at "
- << strMsTimestamp() << ".\n" << logofs_flush;
- #endif
-
- timeouts_.splitTs = nullTimestamp();
- }
- }
-}
-
-void Proxy::setMotionTimeout(int channelId)
-{
- int needed = channels_[channelId] -> needMotion();
-
- if (needed != isTimestamp(timeouts_.motionTs))
- {
- if (channels_[channelId] -> needMotion() == 1)
- {
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
- *logofs << "Proxy: Allocating motion timestamp at "
- << strMsTimestamp() << ".\n" << logofs_flush;
- #endif
-
- timeouts_.motionTs = getTimestamp();
- }
- else
- {
- T_list &channelList = activeChannels_.getList();
-
- for (T_list::iterator j = channelList.begin();
- j != channelList.end(); j++)
- {
- int channelId = *j;
-
- if (channels_[channelId] != NULL &&
- channels_[channelId] -> needMotion() == 1)
- {
- #ifdef TEST
- *logofs << "Proxy: SPLIT! Channel for FD#"
- << getFd(channelId) << " still needs motions.\n"
- << logofs_flush;
- #endif
-
- return;
- }
- }
-
- #if defined(TEST) || defined(INFO) || defined(SPLIT)
- *logofs << "Proxy: Resetting motion timestamp at "
- << strMsTimestamp() << ".\n" << logofs_flush;
- #endif
-
- timeouts_.motionTs = nullTimestamp();
- }
- }
-}
-
-void Proxy::increaseChannels(int channelId)
-{
- #ifdef TEST
- *logofs << "Proxy: Adding channel " << channelId
- << " to the list of active channels.\n"
- << logofs_flush;
- #endif
-
- activeChannels_.add(channelId);
-
- #ifdef TEST
- *logofs << "Proxy: There are " << activeChannels_.getSize()
- << " allocated channels for proxy FD#" << fd_
- << ".\n" << logofs_flush;
- #endif
-}
-
-void Proxy::decreaseChannels(int channelId)
-{
- #ifdef TEST
- *logofs << "Proxy: Removing channel " << channelId
- << " from the list of active channels.\n"
- << logofs_flush;
- #endif
-
- activeChannels_.remove(channelId);
-
- #ifdef TEST
- *logofs << "Proxy: There are " << activeChannels_.getSize()
- << " allocated channels for proxy FD#" << fd_
- << ".\n" << logofs_flush;
- #endif
-}
-
-int Proxy::allocateTransport(int channelFd, int channelId)
-{
- if (transports_[channelId] == NULL)
- {
- transports_[channelId] = new Transport(channelFd);
-
- if (transports_[channelId] == NULL)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Can't allocate transport for "
- << "channel id " << channelId << ".\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Can't allocate transport for "
- << "channel id " << channelId << ".\n";
-
- return -1;
- }
- }
- else if (transports_[channelId] ->
- getType() != transport_agent)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Transport for channel id "
- << channelId << " should be null.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Transport for channel id "
- << channelId << " should be null.\n";
-
- return -1;
- }
-
- return 1;
-}
-
-int Proxy::deallocateTransport(int channelId)
-{
- //
- // Transport for the agent connection
- // is passed from the outside when
- // creating the channel.
- //
-
- if (transports_[channelId] ->
- getType() != transport_agent)
- {
- delete transports_[channelId];
- }
-
- transports_[channelId] = NULL;
-
- return 1;
-}
-
-int Proxy::handleNewGenericConnection(int clientFd, T_channel_type type, const char *label)
-{
- int channelId = allocateChannelMap(clientFd);
-
- if (channelId == -1)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Maximum number of available "
- << "channels exceeded.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Maximum number of available "
- << "channels exceeded.\n";
-
- return -1;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Channel for " << label << " descriptor "
- << "FD#" << clientFd << " mapped to ID#"
- << channelId << ".\n"
- << logofs_flush;
- #endif
-
- //
- // Turn queuing off for path server-to-proxy.
- //
-
- SetNoDelay(clientFd, 1);
-
- if (allocateTransport(clientFd, channelId) < 0)
- {
- return -1;
- }
-
- switch (type)
- {
- case channel_cups:
- {
- channels_[channelId] = new CupsChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_smb:
- {
- channels_[channelId] = new SmbChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_media:
- {
- channels_[channelId] = new MediaChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_http:
- {
- channels_[channelId] = new HttpChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_font:
- {
- channels_[channelId] = new FontChannel(transports_[channelId], compressor_);
-
- break;
- }
- default:
- {
- channels_[channelId] = new SlaveChannel(transports_[channelId], compressor_);
-
- break;
- }
- }
-
- if (channels_[channelId] == NULL)
- {
- deallocateTransport(channelId);
-
- return -1;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Accepted new connection to "
- << label << " server.\n" << logofs_flush;
- #endif
-
- cerr << "Info" << ": Accepted new connection to "
- << label << " server.\n";
-
- increaseChannels(channelId);
-
- switch (type)
- {
- case channel_cups:
- {
- if (handleControl(code_new_cups_connection, channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- case channel_smb:
- {
- if (handleControl(code_new_smb_connection, channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- case channel_media:
- {
- if (handleControl(code_new_media_connection, channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- case channel_http:
- {
- if (handleControl(code_new_http_connection, channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- case channel_font:
- {
- if (handleControl(code_new_font_connection, channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- default:
- {
- if (handleControl(code_new_slave_connection, channelId) < 0)
- {
- return -1;
- }
-
- break;
- }
- }
-
- channels_[channelId] -> handleConfiguration();
-
- return 1;
-}
-
-int Proxy::handleNewSlaveConnection(int clientFd)
-{
- // Since ProtoStep7 (#issue 108)
- return handleNewGenericConnection(clientFd, channel_slave, "slave");
-}
-
-
-
-int Proxy::handleNewGenericConnectionFromProxy(int channelId, T_channel_type type,
- ChannelEndPoint &endPoint, const char *label)
-{
- char *unixPath, *host;
- long port;
-
- if (endPoint.getUnixPath(&unixPath)) {
- return handleNewGenericConnectionFromProxyUnix(channelId, type, unixPath, label);
- }
-
- if (endPoint.getTCPHostAndPort(&host, &port)) {
- return handleNewGenericConnectionFromProxyTCP(channelId, type, host, port, label);
- }
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Refusing attempted connection "
- << "to " << label << " server.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Refusing attempted connection "
- << "to " << label << " server.\n";
-
- return -1;
-}
-
-int Proxy::handleNewGenericConnectionFromProxyTCP(int channelId, T_channel_type type,
- const char *hostname, long port, const char *label)
-
-{
- if (port <= 0)
- {
- //
- // This happens when user has disabled
- // forwarding of the specific service.
- //
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Refusing attempted connection "
- << "to " << label << " server.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Refusing attempted connection "
- << "to " << label << " server.\n";
-
- return -1;
- }
-
- const char *serverHost = hostname;
- int serverAddrFamily = AF_INET;
- sockaddr *serverAddr = NULL;
- unsigned int serverAddrLength = 0;
-
- #ifdef TEST
- *logofs << "Proxy: Connecting to " << label
- << " server '" << serverHost << "' on TCP port '"
- << port << "'.\n" << logofs_flush;
- #endif
-
- int serverIPAddr = GetHostAddress(serverHost);
-
- if (serverIPAddr == 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Unknown " << label
- << " server host '" << serverHost << "'.\n"
- << logofs_flush;
- #endif
-
- cerr << "Error" << ": Unknown " << label
- << " server host '" << serverHost
- << "'.\n";
-
- return -1;
- }
-
- sockaddr_in *serverAddrTCP = new sockaddr_in;
-
- serverAddrTCP -> sin_family = AF_INET;
- serverAddrTCP -> sin_port = htons(port);
- serverAddrTCP -> sin_addr.s_addr = serverIPAddr;
-
- serverAddr = (sockaddr *) serverAddrTCP;
- serverAddrLength = sizeof(sockaddr_in);
-
- //
- // Connect to the requested server.
- //
-
- int serverFd = socket(serverAddrFamily, SOCK_STREAM, PF_UNSPEC);
-
- if (serverFd < 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Call to socket failed. "
- << "Error is " << EGET() << " '" << ESTR()
- << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Call to socket failed. "
- << "Error is " << EGET() << " '" << ESTR()
- << "'.\n";
-
- delete serverAddrTCP;
-
- return -1;
- }
- else if (connect(serverFd, serverAddr, serverAddrLength) < 0)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Connection to " << label
- << " server '" << serverHost << ":" << port
- << "' failed with error '" << ESTR() << "'.\n"
- << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Connection to " << label
- << " server '" << serverHost << ":" << port
- << "' failed with error '" << ESTR() << "'.\n";
-
- close(serverFd);
-
- delete serverAddrTCP;
-
- return -1;
- }
-
- delete serverAddrTCP;
-
- if (handlePostConnectionFromProxy(channelId, serverFd, type, label) < 0)
- {
- return -1;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Forwarded new connection to "
- << label << " server on port '" << port
- << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Info" << ": Forwarded new connection to "
- << label << " server on port '" << port
- << "'.\n";
-
- return 1;
-}
-
-int Proxy::handleNewGenericConnectionFromProxyUnix(int channelId, T_channel_type type,
- const char *path, const char *label)
-{
- if (path == NULL || *path == '\0' )
- {
- //
- // This happens when user has disabled
- // forwarding of the specific service.
- //
-
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Refusing attempted connection "
- << "to " << label << " server.\n" << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Refusing attempted connection "
- << "to " << label << " server.\n";
-
- return -1;
- }
-
- sockaddr_un serverAddrUnix;
-
- unsigned int serverAddrLength = sizeof(sockaddr_un);
-
- int serverAddrFamily = AF_UNIX;
-
- serverAddrUnix.sun_family = AF_UNIX;
-
- const int serverAddrNameLength = 108;
-
- strncpy(serverAddrUnix.sun_path, path, serverAddrNameLength);
-
- *(serverAddrUnix.sun_path + serverAddrNameLength - 1) = '\0';
-
- #ifdef TEST
- *logofs << "Proxy: Connecting to " << label << " server "
- << "on Unix port '" << path << "'.\n" << logofs_flush;
- #endif
-
- //
- // Connect to the requested server.
- //
-
- int serverFd = socket(serverAddrFamily, SOCK_STREAM, PF_UNSPEC);
-
- if (serverFd < 0)
- {
- #ifdef PANIC
- *logofs << "Proxy: PANIC! Call to socket failed. "
- << "Error is " << EGET() << " '" << ESTR()
- << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Error" << ": Call to socket failed. "
- << "Error is " << EGET() << " '" << ESTR()
- << "'.\n";
-
- return -1;
- }
- else if (connect(serverFd, (sockaddr *) &serverAddrUnix, serverAddrLength) < 0)
- {
- #ifdef WARNING
- *logofs << "Proxy: WARNING! Connection to " << label
- << " server on Unix port '" << path << "' failed "
- << "with error " << EGET() << ", '" << ESTR() << "'.\n"
- << logofs_flush;
- #endif
-
- cerr << "Warning" << ": Connection to " << label
- << " server on Unix port '" << path << "' failed "
- << "with error " << EGET() << ", '" << ESTR() << "'.\n";
-
- close(serverFd);
-
- return -1;
- }
-
- if (handlePostConnectionFromProxy(channelId, serverFd, type, label) < 0)
- {
- return -1;
- }
-
- #ifdef TEST
- *logofs << "Proxy: Forwarded new connection to "
- << label << " server on Unix port '" << path
- << "'.\n" << logofs_flush;
- #endif
-
- cerr << "Info" << ": Forwarded new connection to "
- << label << " server on Unix port '" << path
- << "'.\n";
-
- return 1;
-}
-
-int Proxy::handleNewSlaveConnectionFromProxy(int channelId)
-{
-
- cerr << "Info" << ": New slave connection on "
- << "channel ID#" << channelId << "\n";
-
- char *nx_slave_cmd = getenv("NX_SLAVE_CMD");
- if (nx_slave_cmd == NULL) {
- return -1;
- }
-
- int spair[2];
- if (socketpair(AF_UNIX, SOCK_STREAM, 0, spair) == -1) {
- perror("socketpair");
- return -1;
- }
-
- int serverFd = spair[0];
- int clientFd = spair[1];
-
- if (handlePostConnectionFromProxy(channelId, serverFd, channel_slave, "slave") < 0)
- {
- close(serverFd);
- close(clientFd);
- return -1;
- }
-
-
- int pid = fork();
- if (pid == 0)
- {
-
- if (dup2(clientFd, 0) == -1)
- {
- perror("dup2");
- exit(1);
- }
-
- if (dup2(clientFd, 1) == -1)
- {
- perror("dup2");
- exit(1);
- }
-
- close(serverFd);
- close(clientFd);
-
- /* Close FDs used by NX, QVD #1208 */
- for (int fd = 3; fd < 256; fd++) {
- close(fd);
- }
-
- char *const argv[2] = {nx_slave_cmd, NULL};
-
- if (execv(nx_slave_cmd, argv) == -1)
- {
- perror("execv");
- }
- exit(1);
-
- }
- else if (pid == -1)
- {
- // TODO Test this!
- perror("fork");
- close(serverFd);
- close(clientFd);
- return -1;
- }
-
- close(clientFd);
- slavePidMap_[channelId] = pid;
-
- cerr << "Info" << ": slave channel ID#" << channelId << " handler has PID " << pid << endl;
-
- return 1;
-}
-
-void Proxy::checkSlaves()
-{
- for (int channelId = 0; channelId<CONNECTIONS_LIMIT; channelId++)
- {
- int pid = slavePidMap_[channelId];
-
- if (pid > 1 && HandleChild(pid))
- {
- slavePidMap_[channelId] = nothing;
- cerr << "Info:" << " Handled death of slave with pid " << pid << endl;
- }
- }
-}
-
-int Proxy::handlePostConnectionFromProxy(int channelId, int serverFd,
- T_channel_type type, const char *label)
-{
- //
- // Turn queuing off for path proxy-to-server.
- //
-
- SetNoDelay(serverFd, 1);
-
- assignChannelMap(channelId, serverFd);
-
- #ifdef TEST
- *logofs << "Proxy: Descriptor FD#" << serverFd
- << " mapped to channel ID#" << channelId << ".\n"
- << logofs_flush;
- #endif
-
- if (allocateTransport(serverFd, channelId) < 0)
- {
- return -1;
- }
-
- switch (type)
- {
- case channel_cups:
- {
- channels_[channelId] = new CupsChannel(transports_[channelId], compressor_);
- break;
- }
- case channel_smb:
- {
- channels_[channelId] = new SmbChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_media:
- {
- channels_[channelId] = new MediaChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_http:
- {
- channels_[channelId] = new HttpChannel(transports_[channelId], compressor_);
-
- break;
- }
- case channel_font:
- {
- channels_[channelId] = new FontChannel(transports_[channelId], compressor_);
-
- break;
- }
- default:
- {
- channels_[channelId] = new SlaveChannel(transports_[channelId], compressor_);
-
- break;
- }
- }
-
- if (channels_[channelId] == NULL)
- {
- deallocateTransport(channelId);
-
- return -1;
- }
-
- increaseChannels(channelId);
-
- channels_[channelId] -> handleConfiguration();
-
- return 1;
-}