/**************************************************************************/ /* */ /* Copyright (c) 2001, 2010 NoMachine, http://www.nomachine.com/. */ /* */ /* NXCOMP, NX protocol compression and NX extensions to this software */ /* are copyright of NoMachine. Redistribution and use of the present */ /* software is allowed according to terms specified in the file LICENSE */ /* which comes in the source distribution. */ /* */ /* Check http://www.nomachine.com/licensing.html for applicability. */ /* */ /* NX and NoMachine are trademarks of Medialogic S.p.A. */ /* */ /* All rights reserved. */ /* */ /**************************************************************************/ #ifndef Message_H #define Message_H #include <X11/Xproto.h> #include "NXproto.h" #include "Misc.h" #include "Control.h" #include "Types.h" #include "Timestamp.h" #include "ActionCache.h" #include "StaticCompressor.h" // // Forward class declarations. // class ChannelCache; class EncodeBuffer; class DecodeBuffer; class WriteBuffer; // // Set the verbosity level. // #define PANIC #define WARNING #undef TEST #undef DEBUG // // Define this to know how many messages // are allocated and deallocated. // #undef REFERENCES // // Set default values. We limit the maximum // size of a request to 262144 but we need to // consider the replies, whose size may be up // to 4MB. // #define MESSAGE_ENABLE_CACHE 0 #define MESSAGE_ENABLE_DATA 0 #define MESSAGE_ENABLE_SPLIT 0 #define MESSAGE_ENABLE_COMPRESS 0 #define MESSAGE_DATA_LIMIT 4194304 - 4 #define MESSAGE_DATA_OFFSET 4 #define MESSAGE_CACHE_SLOTS 6000 #define MESSAGE_CACHE_THRESHOLD 50 #define MESSAGE_CACHE_LOWER_THRESHOLD 5 // // Base message class. // class Message { friend class MessageStore; friend class RenderExtensionStore; public: Message() { hits_ = 0; last_ = 0; locks_ = 0; size_ = 0; c_size_ = 0; md5_digest_ = NULL; #ifdef REFERENCES references_++; *logofs << "Message: Created new message at " << this << " out of " << references_ << " allocated messages.\n" << logofs_flush; #endif } Message(const Message &message) { size_ = message.size_; c_size_ = message.c_size_; i_size_ = message.i_size_; hits_ = message.hits_; last_ = message.last_; locks_ = message.locks_; data_ = message.data_; #ifdef REFERENCES references_++; *logofs << "Message: Creating new copied message at " << this << " out of " << references_ << " allocated messages.\n" << logofs_flush; #endif if (message.md5_digest_ != NULL) { md5_digest_ = new md5_byte_t[MD5_LENGTH]; memcpy(md5_digest_, message.md5_digest_, MD5_LENGTH); #ifdef DEBUG *logofs << "Message: Created MD5 digest for object at " << this << ".\n" << logofs_flush; #endif } else { md5_digest_ = NULL; } } ~Message() { #ifdef DEBUG if (md5_digest_ != NULL) { *logofs << "Message: Deleted MD5 digest for object at " << this << ".\n" << logofs_flush; } #endif delete [] md5_digest_; #ifdef REFERENCES references_--; *logofs << "Message: Deleted message at " << this << " out of " << references_ << " allocated messages.\n" << logofs_flush; #endif } // // This is the original message size // including the data part regardless // data is still stored in the object. // int size_; // // This is the size of the identity. // int i_size_; // // This is the size, including identity, // after message has been 'updated' to // reflect storage of data in compressed // format. // int c_size_; protected: // // This is the data part. // T_data data_; // // Time of last hit. // time_t last_; // // This is the number of cache hits // registered for the object. // short int hits_; // // This is used to mark messages // that have been split. // short int locks_; // // This is the MD5 checksum. // md5_byte_t *md5_digest_; // // Keep a reference counter // of allocated objects. // #ifdef REFERENCES static int references_; #endif }; // // Repository of messages. // class MessageStore { public: // // Enable or disable cache of messages in store. // int enableCache; // // Does message have a distinct data part. // int enableData; // // Enable or disable split of data part. // int enableSplit; // // Enable or disable compression of data part. // int enableCompress; // // Set starting point of data part in the message. // int dataOffset; // // Set maximum size for the data part of each message. // int dataLimit; // // Set maximum elements in cache. // int cacheSlots; // // Set the percentage of total cache memory which // a given type of message is allowed to occupy. // When threshold is exceeded store is cleaned to // make room for a new message of the same type. // int cacheThreshold; // // Don't clean the store if percentage of cache // memory occupied by messages of this type is // below the threshold. // int cacheLowerThreshold; // // Last operation performed on cache. // T_store_action lastAction; // // Position of last element stored in cache. // short int lastAdded; // // Positions of last element found in cache. // short int lastHit; // // Position of last element erased. // short int lastRemoved; // // Used to encode the the action to // perform on the store and the slot // involved. // ActionCache lastActionCache; // // Position in cache where next insertion // is going to take place. // short int lastRated; // // Constructors and destructors. // public: MessageStore(StaticCompressor *compressor = NULL); virtual ~MessageStore(); virtual const char *name() const = 0; virtual unsigned char opcode() const = 0; virtual unsigned int storage() const = 0; // // These are members that must be specialized. // public: virtual Message *create() const = 0; virtual Message *create(const Message &message) const = 0; virtual void destroy(Message *message) const = 0; void validateSize(int size) { if (size < control -> MinimumMessageSize || size > control -> MaximumMessageSize) { *logofs << name() << ": PANIC! Invalid size " << size << " for message.\n" << logofs_flush; cerr << "Error" << ": Invalid size " << size << " for message opcode " << opcode() << ".\n"; HandleAbort(); } } void validateSize(int dataSize, int compressedDataSize) { if (dataSize < 0 || dataSize > control -> MaximumMessageSize - 4 || compressedDataSize < 0 || compressedDataSize >= dataSize) { *logofs << name() << ": PANIC! Invalid data size " << dataSize << " and compressed data size " << compressedDataSize << " for message.\n" << logofs_flush; cerr << "Error" << ": Invalid data size " << dataSize << " and compressed data size " << compressedDataSize << " for message " << "opcode " << (unsigned) opcode() << ".\n"; HandleAbort(); } } // // Determine if the message can be stored // in the cache. // virtual int validateMessage(const unsigned char *buffer, int size) { return (size >= control -> MinimumMessageSize && size <= control -> MaximumMessageSize); } // // Get data offset based on major and minor // opcode of the message. // virtual int identitySize(const unsigned char *buffer, unsigned int size) { return dataOffset; } // // Encode identity and data using the // specific message encoding. // // Some messages do not implement these // methods because the encoding is done // directly in the channel loop. Should // move the encoding methods in in the // message classes. // virtual int encodeIdentity(EncodeBuffer &encodeBuffer, const unsigned char *buffer, unsigned int size, int bigEndian, ChannelCache *channelCache) const { return 1; } virtual int decodeIdentity(DecodeBuffer &decodeBuffer, unsigned char *&buffer, unsigned int &size, int bigEndian, WriteBuffer *writeBuffer, ChannelCache *channelCache) const { return 1; } // // Encode differences between message // in cache and the one to be encoded. // virtual void updateIdentity(EncodeBuffer &encodeBuffer, const Message *message, const Message *cachedMessage, ChannelCache *channelCache) const { } // // Decode differences and update the // cached version of the same message. // virtual void updateIdentity(DecodeBuffer &decodeBuffer, const Message *message, ChannelCache *channelCache) const { } // // Post process the message information // contained in the store by either up- // dating the size record or the actual // data part once the message has been // completely sent to our peer. // void updateData(const int position, unsigned int dataSize, unsigned int compressedDataSize); void updateData(const T_checksum checksum, unsigned int compressedDataSize); void updateData(const int position, const unsigned char *newData, unsigned int dataSize, unsigned int compressedDataSize); // // These members, used internally // in the message store class, are // mandatory. // protected: virtual int parseIdentity(Message *message, const unsigned char *buffer, unsigned int size, int bigEndian) const = 0; virtual int unparseIdentity(const Message *message, unsigned char *buffer, unsigned int size, int bigEndian) const = 0; virtual void identityChecksum(const Message *message, const unsigned char *buffer, unsigned int size, int bigEndian) const = 0; virtual void dumpIdentity(const Message *message) const = 0; // // Design should preserve these from being // virtual. // int parseData(Message *message, int split, const unsigned char *buffer, unsigned int size, T_checksum_action checksumAction, T_data_action dataAction, int bigEndian); int parseData(Message *message, const unsigned char *buffer, unsigned int size, const unsigned char *compressedData, const unsigned int compressedDataSize, T_checksum_action checksumAction, T_data_action dataAction, int bigEndian); int unparseData(const Message *message, unsigned char *buffer, unsigned int size, int bigEndian); // // Manage efficient allocation of messages // in the heap. // void recycle(Message *message) { #ifdef TEST if (message == NULL) { *logofs << name() << ": PANIC! Cannot recycle a null message.\n" << logofs_flush; cerr << "Error" << ": Cannot recycle a null message.\n"; HandleAbort(); } #endif if (temporary_ == NULL) { // // Be careful when reusing the message as // it can contain valid data that must be // explicitly deallocated if not needed. // Note also that you cannot count on the // initialization made in costructor. // temporary_ = message; } else { destroy(message); } } void beginChecksum(Message *message) { if (message -> md5_digest_ == NULL) { message -> md5_digest_ = new md5_byte_t[MD5_LENGTH]; #ifdef DEBUG *logofs << name() << ": Created MD5 digest structure " << "for object at " << message << ".\n" << logofs_flush; #endif } #ifdef DEBUG else { *logofs << name() << ": Using existing MD5 digest structure " << "for object at " << message << ".\n" << logofs_flush; } #endif #ifdef DEBUG *logofs << name() << ": Prepared MD5 digest for object at " << message << ".\n" << logofs_flush; #endif md5_init(md5_state_); } void endChecksum(Message *message) { md5_finish(md5_state_, message -> md5_digest_); #ifdef DEBUG *logofs << name() << ": Calculated checksum for object at " << message << ".\n" << logofs_flush; #endif } void dataChecksum(Message *message, const unsigned char *buffer, unsigned int size, int bigEndian) { // // Messages that have a data part starting // at an offset possibly beyond the end of // the message, must include the size in // the identity checksum. // if ((int) size > message -> i_size_) { md5_append(md5_state_, buffer + message -> i_size_, size - message -> i_size_); } } // // Repository handling methods. // public: // // Extract identity and data from buffer. // The size field will be updated at the // time of data parsing. // int parse(Message *message, int split, const unsigned char *buffer, unsigned int size, T_checksum_action checksumAction, T_data_action dataAction, int bigEndian); int parse(Message *message, const unsigned char *buffer, unsigned int size, const unsigned char *compressedData, const unsigned int compressedDataSize, T_checksum_action checksumAction, T_data_action dataAction, int bigEndian); // // From identity and data write the // final message to the raw buffer. // int unparse(const Message *message, unsigned char *buffer, unsigned int size, int bigEndian) { return (unparseData(message, buffer, size, bigEndian) && unparseIdentity(message, buffer, size, bigEndian)); } void dump(const Message *message) const { dumpIdentity(message); dumpData(message); } void dumpData(const Message *message) const; // // This returns the original message size as it // was received on the link. It takes in account // the data part, regardless data is still stored // in the message object. This information will // be used at the time message is unparsed. // int plainSize(const int position) const { return (*messages_)[position] -> size_; } // // This returns either the size of identity plus // the compressed data part or 0 if message is // stored in uncompressed format. // int compressedSize(const int position) const { return (*messages_)[position] -> c_size_; } // // Returns a pointer to message // given its position in cache. // Message *get(const int position) const { if (position < 0 || position >= cacheSlots) { #ifdef PANIC *logofs << name() << ": PANIC! Requested position " << position << " is not inside the " << "container.\n" << logofs_flush; #endif cerr << "Error" << ": Requested position " << position << " is not inside the" << "container.\n"; HandleAbort(); } else if ((*messages_)[position] == NULL) { #ifdef PANIC *logofs << name() << ": PANIC! Message at position " << position << " is NULL.\n" << logofs_flush; #endif cerr << "Error" << ": Message at position " << position << " is NULL.\n"; HandleAbort(); } return (*messages_)[position]; } // // This is the method called at encoding // side to add a message to cache. // int findOrAdd(Message *message, T_checksum_action checksumAction, T_data_action dataAction, int &added, int &locked); // // Utility interfaces to message insertion // and deletion. // int add(Message *message, const int position, T_checksum_action checksumAction, T_data_action dataAction); int remove(const int position, T_checksum_action checksumAction, T_data_action dataAction); // // Make space in the repository by remove // the first suitable message object. // int clean(T_checksum_action checksumAction); // // Increase or decrease the "rating" of // the message object. // int touch(Message *message) const; int untouch(Message *message) const; int getTouches(const int position) const { Message *message = (*messages_)[position]; if (message == NULL) { return 0; } return message -> hits_; } // // Gives a 'weight' to the cached message. A zero // value means object can be safely removed. A value // greater than zero means it is advisable to retain // the object. A negative result means it is mandato- // ry to keep object in cache. // int getRating(Message *message, T_rating type) const; // // Increase or decrease locks of message at given // position. A locked message will not be removed // from the message store until the lock counter // is zero. // int lock(const int position) const; int unlock(const int position) const; int getLocks(const int position) const { Message *message = (*messages_)[position]; if (message == NULL) { return 0; } return message -> locks_; } T_checksum const getChecksum(const int position) const { return getChecksum(get(position)); } T_checksum const getChecksum(const Message *message) const { if (message -> md5_digest_ == NULL) { #ifdef PANIC *logofs << name() << ": PANIC! Checksum not initialized " << "for object at " << message << ".\n" << logofs_flush; #endif cerr << "Error" << ": Checksum not initialized " << "for object at " << message << ".\n"; HandleAbort(); } #ifdef DEBUG *logofs << name() << ": Got checksum for object at " << message << ".\n" << logofs_flush; #endif return message -> md5_digest_; } // // Calculate the checksum on the fly based the // opcode in the buffer. Useful in the case a // message was not processed or was not stored // in the cache. The returned checksum must be // explicitly deallocated by the caller, after // use. // T_checksum getChecksum(const unsigned char *buffer, unsigned int size, int bigEndian); const unsigned char *getData(const Message *message) const { return message -> data_.begin(); } int plainSize(const Message *message) const { return message -> size_; } int identitySize(Message *message) { return message -> i_size_; } int compressedSize(const Message *message) const { return message -> c_size_; } Message *getTemporary() { if (temporary_ == NULL) { temporary_ = create(); } return temporary_; } void resetTemporary() { temporary_ = NULL; } // // On side where we don't have checksums, we // count how many messages are in the array. // This is obviously expensive and should be // only performed when reporting statistics. // int getSize() const { int size = checksums_ -> size(); if (size == 0) { for (int i = 0; i < cacheSlots; i++) { if ((*messages_)[i] != NULL) { size++; } } } return size; } int getLocalStorageSize() const { return localStorageSize_; } int getRemoteStorageSize() const { return remoteStorageSize_; } int getLocalTotalStorageSize() const { return totalLocalStorageSize_; } int getRemoteTotalStorageSize() const { return totalRemoteStorageSize_; } static int getCumulativeTotalStorageSize() { return (totalLocalStorageSize_ > totalRemoteStorageSize_ ? totalLocalStorageSize_ : totalRemoteStorageSize_); } int saveStore(ostream *cachefs, md5_state_t *md5_state_stream, md5_state_t *md5_state_client, T_checksum_action checksumAction, T_data_action dataAction, int bigEndian); int loadStore(istream *cachefs, md5_state_t *md5_state_stream, T_checksum_action checksumAction, T_data_action dataAction, int bigEndian); protected: // // Estimate the memory requirements of given // instance of message. Size includes memory // allocated from heap to store checksum and // data. // void storageSize(const Message *message, unsigned int &local, unsigned int &remote) const; // // Just used for debug. // void printStorageSize(); // // Repositories where to save cached messages. // First is a vector of pointers, the second // is a hash table used for fast lookups. // T_messages *messages_; T_checksums *checksums_; // // A message object to be used as a temporary. // Reuse the temporary object if possible, if // not, create a new instance. // Message *temporary_; // // Used to calculate message's checksum. // md5_state_t *md5_state_; private: // // Used to compress data payload. // StaticCompressor *compressor_; // // Keep track of how many bytes // are taken by cache. // int localStorageSize_; int remoteStorageSize_; static int totalLocalStorageSize_; static int totalRemoteStorageSize_; // // Used to track object allocation and deallocation. // #ifdef REFERENCES static int references_; #endif }; // // This is an ancillary class of the message // store, used to encode extensions based on // the minor opcode. // class MinorMessageStore { public: virtual ~MinorMessageStore() { } virtual const char *name() const = 0; virtual int identitySize(const unsigned char *buffer, unsigned int size) = 0; virtual int encodeMessage(EncodeBuffer &encodeBuffer, const unsigned char *buffer, const unsigned int size, int bigEndian, ChannelCache *channelCache) const { return 1; } virtual int decodeMessage(DecodeBuffer &decodeBuffer, unsigned char *&buffer, unsigned int &size, unsigned char type, int bigEndian, WriteBuffer *writeBuffer, ChannelCache *channelCache) const { return 1; } virtual void encodeData(EncodeBuffer &encodeBuffer, const unsigned char *buffer, unsigned int size, int bigEndian, ChannelCache *channelCache) const { } virtual void decodeData(DecodeBuffer &decodeBuffer, unsigned char *buffer, unsigned int size, int bigEndian, ChannelCache *channelCache) const { } virtual int parseIdentity(Message *message, const unsigned char *buffer, unsigned int size, int bigEndian) const = 0; virtual int unparseIdentity(const Message *message, unsigned char *buffer, unsigned int size, int bigEndian) const = 0; virtual void updateIdentity(EncodeBuffer &encodeBuffer, const Message *message, const Message *cachedMessage, ChannelCache *channelCache) const { } virtual void updateIdentity(DecodeBuffer &decodeBuffer, const Message *message, ChannelCache *channelCache) const { } virtual void identityChecksum(const Message *message, const unsigned char *buffer, unsigned int size, md5_state_t *md5_state, int bigEndian) const = 0; }; #endif /* Message_H */