/**************************************************************************/ /* */ /* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ /* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ /* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ /* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ /* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ /* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ /* */ /* NXCOMP, NX protocol compression and NX extensions to this software */ /* are copyright of the aforementioned persons and companies. */ /* */ /* Redistribution and use of the present software is allowed according */ /* to terms specified in the file LICENSE.nxcomp which comes in the */ /* source distribution. */ /* */ /* All rights reserved. */ /* */ /* NOTE: This software has received contributions from various other */ /* contributors, only the core maintainers and supporters are listed as */ /* copyright holders. Please contact us, if you feel you should be listed */ /* as copyright holder, as well. */ /* */ /**************************************************************************/ #ifndef Split_H #define Split_H #include "Types.h" #include "Timestamp.h" #include "Message.h" // // Set the verbosity level. // #define PANIC #define WARNING #undef TEST #undef DEBUG // // Define this to know how many splits // are allocated and deallocated. // #undef REFERENCES // // Size of header of messages saved on // disk. // #define SPLIT_HEADER_SIZE 12 // // This class is used to divide big messages // in smaller chunks and send them at idle // time. // class EncodeBuffer; class DecodeBuffer; class SplitStore; class CommitStore; // // Preferred message streaming policy. // typedef enum { split_none = -1, split_async = 1, split_sync } T_split_mode; // // Current state of the split. Used to // implement the state machine. // typedef enum { split_undefined = -1, split_added, split_missed, split_loaded, split_aborted, split_notified } T_split_state; class Split { friend class SplitStore; friend class CommitStore; public: Split(); ~Split(); // // Note that, differently from the message // store, the split store doesn't account // for the data offset when dealing with // the data. This means that both the size_ // and c_size members represent the actual // size of the data part. // void compressedSize(int size) { c_size_ = size; store_ -> validateSize(d_size_, c_size_); } int compressedSize() { return c_size_; } int plainSize() { return i_size_ + d_size_; } T_checksum getChecksum() { return checksum_; } MessageStore *getStore() { return store_; } T_split_state getState() { return state_; } T_store_action getAction() { return action_; } // // We may need to find the resource // associated to the split message // because old protocol version use // a single store for all splits. // int getResource() { return resource_; } int getRequest() { return store_ -> opcode(); } int getPosition() { return position_; } T_split_mode getMode() { return mode_; } void setPolicy(int load, int save) { load_ = load; save_ = save; } void setState(T_split_state state) { state_ = state; } private: // // The agent's resource which is splitting // the message. // int resource_; // // Where to find the message in the message // store or the X sequence number of the // original request, in recent versions. // int position_; // // Which store is involved. // MessageStore *store_; // // Identity size of the message. // int i_size_; // // This is the uncompressed data size of the // original message. // int d_size_; // // This is the size of the compressed data, // if the data is stored in this form. // int c_size_; // // Size of the data buffer, as known by the // encoding side. This field is only used at // the decoding side. The remote size can be // different from the actual data size, if // the encoding side did not confirm that it // received the abort split event. // int r_size_; // // Position in the data buffer that will be // the target of the next send or receive // operation while streaming the message. // int next_; // // Load or save the split to disk. // int load_; int save_; // // Checksum of the original message. // T_checksum checksum_; // // Was this split confirmed or aborted? // T_split_state state_; // // What's the policy for sending this split? // T_split_mode mode_; // // Operation that had been performed on the // store at the time the split was added. // T_store_action action_; // // Container for the identity and data part // of the X message. // T_data identity_; T_data data_; #ifdef REFERENCES static int references_; #endif }; class SplitStore { public: SplitStore(StaticCompressor *compressor, CommitStore *commits, int resource); ~SplitStore(); Split *getFirstSplit() const { if (splits_ -> size() > 0) { return (*(splits_ -> begin())); } return NULL; } Split *getLastSplit() const { if (splits_ -> size() > 0) { return (*(--(splits_ -> end()))); } return NULL; } int getNodeSize(const Split *split) const { // // Take in account 64 bytes of overhead // for each node. // return (sizeof(class Split) + 64 + split -> i_size_ + split -> d_size_); } int getStorageSize() { return splitStorageSize_; } static int getTotalSize() { return totalSplitSize_; } static int getTotalStorageSize() { return totalSplitStorageSize_; } int getResource() { return resource_; } int getSize() { return splits_ -> size(); } T_splits *getSplits() { return splits_; } // // Used, respectively, at the encoding // and decoding side. // Split *add(MessageStore *store, int resource, T_split_mode mode, int position, T_store_action action, T_checksum checksum, const unsigned char *buffer, const int size); Split *add(MessageStore *store, int resource, int position, T_store_action action, T_checksum checksum, unsigned char *buffer, const int size); // // Handle the streaming of the message data. // int send(EncodeBuffer &encodeBuffer, int packetSize); int receive(DecodeBuffer &decodeBuffer); // // Remove the top element of the split store // and update the storage size. // void remove(Split *split); // // Load the message from disk and replace the // message in the store with the new copy. // int load(Split *split); // // Save the data to disk after the message has // been recomposed at the local side. // int save(Split *split); // // Find the message on disk and update the last // modification time. This is currently unused. // int find(Split *split); // // Remove the element on top of the queue and // discard any split data that still needs to // be transferred. // Split *pop(); // // Dump the content of the store. // void dump(); protected: // // Repository where to add the splits. // T_splits *splits_; // // Compress and decompress the data payload. // StaticCompressor *compressor_; private: int start(EncodeBuffer &encodeBuffer); int start(DecodeBuffer &decodeBuffer); void push(Split *split); // // Determine the name of the file object based // on the checksum. // const char *name(const T_checksum checksum); // // The number of elements and data bytes // in the repository. // int splitStorageSize_; static int totalSplitSize_; static int totalSplitStorageSize_; // // Current element being transferred. // T_splits::iterator current_; // // Repository where to move the splits // after they are completely recomposed. // CommitStore *commits_; // // Index in the client store or none, // if this is a commit store. // int resource_; #ifdef REFERENCES static int references_; #endif }; class CommitStore : public SplitStore { // // This is just a split store. // public: CommitStore(StaticCompressor *compressor) : SplitStore(compressor, NULL, nothing) { } // // Move identity and data of the split to the // provided buffer, uncompressing the message, // if needed. // int expand(Split *split, unsigned char *buffer, const int size); // // We recomposed the data part. If the message // was originally added to the message store, // replace the data and/or update the size. // int update(Split *split); // // Remove the split from the commit queue. // Split *pop(); // // This is just used for debug. It checks // if any message in the message store has // an invalid number of locks. // int validate(Split *split); }; #endif /* Split_H */