aboutsummaryrefslogtreecommitdiff
path: root/nxcomp/src/GenericChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'nxcomp/src/GenericChannel.cpp')
-rw-r--r--nxcomp/src/GenericChannel.cpp495
1 files changed, 495 insertions, 0 deletions
diff --git a/nxcomp/src/GenericChannel.cpp b/nxcomp/src/GenericChannel.cpp
new file mode 100644
index 000000000..877412cee
--- /dev/null
+++ b/nxcomp/src/GenericChannel.cpp
@@ -0,0 +1,495 @@
+/**************************************************************************/
+/* */
+/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */
+/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */
+/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */
+/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */
+/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
+/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */
+/* */
+/* NXCOMP, NX protocol compression and NX extensions to this software */
+/* are copyright of the aforementioned persons and companies. */
+/* */
+/* Redistribution and use of the present software is allowed according */
+/* to terms specified in the file LICENSE.nxcomp which comes in the */
+/* source distribution. */
+/* */
+/* All rights reserved. */
+/* */
+/* NOTE: This software has received contributions from various other */
+/* contributors, only the core maintainers and supporters are listed as */
+/* copyright holders. Please contact us, if you feel you should be listed */
+/* as copyright holder, as well. */
+/* */
+/**************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include "GenericChannel.h"
+
+#include "EncodeBuffer.h"
+#include "DecodeBuffer.h"
+
+#include "StaticCompressor.h"
+
+#include "Statistics.h"
+#include "Proxy.h"
+
+extern Proxy *proxy;
+
+//
+// Set the verbosity level.
+//
+
+#define PANIC
+#define WARNING
+#undef TEST
+#undef DEBUG
+
+//
+// Log the important tracepoints related
+// to writing packets to the peer proxy.
+//
+
+#undef FLUSH
+
+//
+// Define this to log when a channel
+// is created or destroyed.
+//
+
+#undef REFERENCES
+
+//
+// Here are the static members.
+//
+
+#ifdef REFERENCES
+
+int GenericChannel::references_ = 0;
+
+#endif
+
+GenericChannel::GenericChannel(Transport *transport, StaticCompressor *compressor)
+
+ : Channel(transport, compressor), readBuffer_(transport_, this)
+{
+ #ifdef REFERENCES
+ *logofs << "GenericChannel: Created new object at "
+ << this << " for FD#" << fd_ << " out of "
+ << ++references_ << " allocated channels.\n"
+ << logofs_flush;
+ #endif
+}
+
+GenericChannel::~GenericChannel()
+{
+ #ifdef REFERENCES
+ *logofs << "GenericChannel: Deleted object at "
+ << this << " for FD#" << fd_ << " out of "
+ << --references_ << " allocated channels.\n"
+ << logofs_flush;
+ #endif
+}
+
+//
+// Beginning of handleRead().
+//
+
+int GenericChannel::handleRead(EncodeBuffer &encodeBuffer, const unsigned char *message,
+ unsigned int length)
+{
+ #ifdef TEST
+ *logofs << "handleRead: Called for FD#" << fd_
+ << " with " << encodeBuffer.getLength()
+ << " bytes already encoded.\n"
+ << logofs_flush;
+ #endif
+
+ //
+ // Pointer to located message and
+ // its size in bytes.
+ //
+
+ const unsigned char *inputMessage;
+ unsigned int inputLength;
+
+ //
+ // Tag message as generic data in compression
+ // routine. Opcode is not actually transferred
+ // over the network.
+ //
+
+ unsigned char inputOpcode = X_NXInternalGenericData;
+
+ #if defined(TEST) || defined(INFO)
+ *logofs << "handleRead: Trying to read from FD#"
+ << fd_ << " at " << strMsTimestamp() << ".\n"
+ << logofs_flush;
+ #endif
+
+ int result = readBuffer_.readMessage();
+
+ #ifdef DEBUG
+ *logofs << "handleRead: Read result on FD#" << fd_
+ << " is " << result << ".\n"
+ << logofs_flush;
+ #endif
+
+ if (result < 0)
+ {
+ //
+ // Let the proxy close the channel.
+ //
+
+ return -1;
+ }
+ else if (result == 0)
+ {
+ #if defined(TEST) || defined(INFO)
+
+ *logofs << "handleRead: PANIC! No data read from FD#"
+ << fd_ << " while encoding messages.\n"
+ << logofs_flush;
+
+ HandleCleanup();
+
+ #endif
+
+ return 0;
+ }
+
+ #if defined(TEST) || defined(INFO) || defined(FLUSH)
+ *logofs << "handleRead: Encoding messages for FD#" << fd_
+ << " with " << readBuffer_.getLength() << " bytes "
+ << "in the buffer.\n" << logofs_flush;
+ #endif
+
+ //
+ // Divide the available data in multiple
+ // messages and encode them one by one.
+ //
+
+ if (proxy -> handleAsyncSwitch(fd_) < 0)
+ {
+ return -1;
+ }
+
+ while ((inputMessage = readBuffer_.getMessage(inputLength)) != NULL)
+ {
+ encodeBuffer.encodeValue(inputLength, 32, 14);
+
+ if (isCompressed() == 1)
+ {
+ unsigned int compressedDataSize = 0;
+ unsigned char *compressedData = NULL;
+
+ if (handleCompress(encodeBuffer, inputOpcode, 0,
+ inputMessage, inputLength, compressedData,
+ compressedDataSize) < 0)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ encodeBuffer.encodeMemory(inputMessage, inputLength);
+ }
+
+ int bits = encodeBuffer.diffBits();
+
+ #if defined(TEST) || defined(OPCODES)
+ *logofs << "handleRead: Handled generic data for FD#" << fd_
+ << ". " << inputLength << " bytes in, " << bits << " bits ("
+ << ((float) bits) / 8 << " bytes) out.\n" << logofs_flush;
+ #endif
+
+ addProtocolBits(inputLength << 3, bits);
+
+ if (isPrioritized() == 1)
+ {
+ priority_++;
+ }
+
+ } // End of while ((inputMessage = readBuffer_.getMessage(inputLength)) != NULL) ...
+
+ //
+ // All data has been read from the read buffer.
+ // We still need to mark the end of the encode
+ // buffer just before sending the frame. This
+ // allows us to accommodate multiple reads in
+ // a single frame.
+ //
+
+ if (priority_ > 0)
+ {
+ #if defined(TEST) || defined(INFO)
+ *logofs << "handleRead: WARNING! Requesting flush "
+ << "because of " << priority_ << " prioritized "
+ << "messages for FD#" << fd_ << ".\n"
+ << logofs_flush;
+ #endif
+
+ if (proxy -> handleAsyncPriority() < 0)
+ {
+ return -1;
+ }
+
+ //
+ // Reset the priority flag.
+ //
+
+ priority_ = 0;
+ }
+
+ //
+ // Flush if we produced enough data.
+ //
+
+ if (proxy -> canAsyncFlush() == 1)
+ {
+ #if defined(TEST) || defined(INFO)
+ *logofs << "handleRead: WARNING! Requesting flush "
+ << "because of enough data or timeout on the "
+ << "proxy link.\n" << logofs_flush;
+ #endif
+
+ if (proxy -> handleAsyncFlush() < 0)
+ {
+ return -1;
+ }
+ }
+
+ #if defined(TEST) || defined(INFO)
+
+ if (transport_ -> pending() != 0 ||
+ readBuffer_.checkMessage() != 0)
+ {
+ *logofs << "handleRead: PANIC! Buffer for X descriptor FD#"
+ << fd_ << " has " << transport_ -> pending()
+ << " bytes to read.\n" << logofs_flush;
+
+ HandleCleanup();
+ }
+
+ #endif
+
+ //
+ // Reset the read buffer.
+ //
+
+ readBuffer_.fullReset();
+
+ return 1;
+}
+
+//
+// End of handleRead().
+//
+
+//
+// Beginning of handleWrite().
+//
+
+int GenericChannel::handleWrite(const unsigned char *message, unsigned int length)
+{
+ #ifdef TEST
+ *logofs << "handleWrite: Called for FD#" << fd_ << ".\n" << logofs_flush;
+ #endif
+
+ //
+ // Create the buffer from which to
+ // decode messages.
+ //
+
+ DecodeBuffer decodeBuffer(message, length);
+
+ #if defined(TEST) || defined(INFO) || defined(FLUSH)
+ *logofs << "handleWrite: Decoding messages for FD#" << fd_
+ << " with " << length << " bytes in the buffer.\n"
+ << logofs_flush;
+ #endif
+
+ unsigned char *outputMessage;
+ unsigned int outputLength;
+
+ //
+ // Tag message as generic data
+ // in decompression.
+ //
+
+ unsigned char outputOpcode = X_NXInternalGenericData;
+
+ for (;;)
+ {
+ decodeBuffer.decodeValue(outputLength, 32, 14);
+
+ if (outputLength == 0)
+ {
+ break;
+ }
+
+ if (isCompressed() == 1)
+ {
+ if (writeBuffer_.getAvailable() < outputLength ||
+ (int) outputLength >= control -> TransportFlushBufferSize)
+ {
+ #ifdef DEBUG
+ *logofs << "handleWrite: Using scratch buffer for "
+ << "generic data with size " << outputLength << " and "
+ << writeBuffer_.getLength() << " bytes in buffer.\n"
+ << logofs_flush;
+ #endif
+
+ outputMessage = writeBuffer_.addScratchMessage(outputLength);
+ }
+ else
+ {
+ outputMessage = writeBuffer_.addMessage(outputLength);
+ }
+
+ const unsigned char *compressedData = NULL;
+ unsigned int compressedDataSize = 0;
+
+ int decompressed = handleDecompress(decodeBuffer, outputOpcode, 0,
+ outputMessage, outputLength, compressedData,
+ compressedDataSize);
+ if (decompressed < 0)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ #ifdef DEBUG
+ *logofs << "handleWrite: Using scratch buffer for "
+ << "generic data with size " << outputLength << " and "
+ << writeBuffer_.getLength() << " bytes in buffer.\n"
+ << logofs_flush;
+ #endif
+
+ writeBuffer_.addScratchMessage((unsigned char *)
+ decodeBuffer.decodeMemory(outputLength), outputLength);
+ }
+
+ #if defined(TEST) || defined(OPCODES)
+ *logofs << "handleWrite: Handled generic data for FD#" << fd_
+ << ". " << outputLength << " bytes out.\n"
+ << logofs_flush;
+ #endif
+
+ handleFlush(flush_if_needed);
+ }
+
+ //
+ // Write any remaining data to socket.
+ //
+
+ if (handleFlush(flush_if_any) < 0)
+ {
+ return -1;
+ }
+
+ return 1;
+}
+
+//
+// End of handleWrite().
+//
+
+//
+// Other members.
+//
+
+int GenericChannel::handleCompletion(EncodeBuffer &encodeBuffer)
+{
+ //
+ // Add the bits telling to the remote
+ // that all data in the frame has been
+ // encoded.
+ //
+
+ if (encodeBuffer.getLength() > 0)
+ {
+ #if defined(TEST) || defined(INFO)
+ *logofs << "handleCompletion: Writing completion bits with "
+ << encodeBuffer.getLength() << " bytes encoded "
+ << "for FD#" << fd_ << ".\n" << logofs_flush;
+ #endif
+
+ encodeBuffer.encodeValue(0, 32, 14);
+
+ return 1;
+ }
+ #if defined(TEST) || defined(INFO)
+ else
+ {
+ *logofs << "handleCompletion: PANIC! No completion to write "
+ << "for FD#" << fd_ << ".\n" << logofs_flush;
+
+ HandleCleanup();
+ }
+ #endif
+
+ return 0;
+}
+
+int GenericChannel::handleConfiguration()
+{
+ #ifdef TEST
+ *logofs << "GenericChannel: Setting new buffer parameters.\n"
+ << logofs_flush;
+ #endif
+
+ readBuffer_.setSize(control -> GenericInitialReadSize,
+ control -> GenericMaximumBufferSize);
+
+ writeBuffer_.setSize(control -> TransportGenericBufferSize,
+ control -> TransportGenericBufferThreshold,
+ control -> TransportMaximumBufferSize);
+
+ transport_ -> setSize(control -> TransportGenericBufferSize,
+ control -> TransportGenericBufferThreshold,
+ control -> TransportMaximumBufferSize);
+
+ return 1;
+}
+
+int GenericChannel::handleFinish()
+{
+ #ifdef TEST
+ *logofs << "GenericChannel: Finishing channel for FD#"
+ << fd_ << ".\n" << logofs_flush;
+ #endif
+
+ congestion_ = 0;
+ priority_ = 0;
+
+ finish_ = 1;
+
+ transport_ -> fullReset();
+
+ return 1;
+}
+
+int GenericChannel::setReferences()
+{
+ #ifdef TEST
+ *logofs << "GenericChannel: Initializing the static "
+ << "members for the generic channels.\n"
+ << logofs_flush;
+ #endif
+
+ #ifdef REFERENCES
+
+ references_ = 0;
+
+ #endif
+
+ return 1;
+}