diff options
Diffstat (limited to 'nxcomp/src/Split.h')
-rw-r--r-- | nxcomp/src/Split.h | 543 |
1 files changed, 543 insertions, 0 deletions
diff --git a/nxcomp/src/Split.h b/nxcomp/src/Split.h new file mode 100644 index 000000000..ee5eae7fe --- /dev/null +++ b/nxcomp/src/Split.h @@ -0,0 +1,543 @@ +/**************************************************************************/ +/* */ +/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ +/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ +/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ +/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ +/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ +/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ +/* */ +/* NXCOMP, NX protocol compression and NX extensions to this software */ +/* are copyright of the aforementioned persons and companies. */ +/* */ +/* Redistribution and use of the present software is allowed according */ +/* to terms specified in the file LICENSE.nxcomp which comes in the */ +/* source distribution. */ +/* */ +/* All rights reserved. */ +/* */ +/* NOTE: This software has received contributions from various other */ +/* contributors, only the core maintainers and supporters are listed as */ +/* copyright holders. Please contact us, if you feel you should be listed */ +/* as copyright holder, as well. */ +/* */ +/**************************************************************************/ + +#ifndef Split_H +#define Split_H + +#include "Types.h" +#include "Timestamp.h" +#include "Message.h" + +// +// Set the verbosity level. +// + +#define PANIC +#define WARNING +#undef TEST +#undef DEBUG + +// +// Define this to know how many splits +// are allocated and deallocated. +// + +#undef REFERENCES + +// +// Size of header of messages saved on +// disk. +// + +#define SPLIT_HEADER_SIZE 12 + +// +// This class is used to divide big messages +// in smaller chunks and send them at idle +// time. +// + +class EncodeBuffer; +class DecodeBuffer; + +class SplitStore; +class CommitStore; + +// +// Preferred message streaming policy. +// + +typedef enum +{ + split_none = -1, + split_async = 1, + split_sync + +} T_split_mode; + +// +// Current state of the split. Used to +// implement the state machine. +// + +typedef enum +{ + split_undefined = -1, + split_added, + split_missed, + split_loaded, + split_aborted, + split_notified + +} T_split_state; + +class Split +{ + friend class SplitStore; + friend class CommitStore; + + public: + + Split(); + + ~Split(); + + // + // Note that, differently from the message + // store, the split store doesn't account + // for the data offset when dealing with + // the data. This means that both the size_ + // and c_size members represent the actual + // size of the data part. + // + + void compressedSize(int size) + { + c_size_ = size; + + store_ -> validateSize(d_size_, c_size_); + } + + int compressedSize() + { + return c_size_; + } + + int plainSize() + { + return i_size_ + d_size_; + } + + T_checksum getChecksum() + { + return checksum_; + } + + MessageStore *getStore() + { + return store_; + } + + T_split_state getState() + { + return state_; + } + + T_store_action getAction() + { + return action_; + } + + // + // We may need to find the resource + // associated to the split message + // because old protocol version use + // a single store for all splits. + // + + int getResource() + { + return resource_; + } + + int getRequest() + { + return store_ -> opcode(); + } + + int getPosition() + { + return position_; + } + + T_split_mode getMode() + { + return mode_; + } + + void setPolicy(int load, int save) + { + load_ = load; + save_ = save; + } + + void setState(T_split_state state) + { + state_ = state; + } + + private: + + // + // The agent's resource which is splitting + // the message. + // + + int resource_; + + // + // Where to find the message in the message + // store or the X sequence number of the + // original request, in recent versions. + // + + int position_; + + // + // Which store is involved. + // + + MessageStore *store_; + + // + // Identity size of the message. + // + + int i_size_; + + // + // This is the uncompressed data size of the + // original message. + // + + int d_size_; + + // + // This is the size of the compressed data, + // if the data is stored in this form. + // + + int c_size_; + + // + // Size of the data buffer, as known by the + // encoding side. This field is only used at + // the decoding side. The remote size can be + // different from the actual data size, if + // the encoding side did not confirm that it + // received the abort split event. + // + + int r_size_; + + // + // Position in the data buffer that will be + // the target of the next send or receive + // operation while streaming the message. + // + + int next_; + + // + // Load or save the split to disk. + // + + int load_; + int save_; + + // + // Checksum of the original message. + // + + T_checksum checksum_; + + // + // Was this split confirmed or aborted? + // + + T_split_state state_; + + // + // What's the policy for sending this split? + // + + T_split_mode mode_; + + // + // Operation that had been performed on the + // store at the time the split was added. + // + + T_store_action action_; + + // + // Container for the identity and data part + // of the X message. + // + + T_data identity_; + T_data data_; + + #ifdef REFERENCES + + static int references_; + + #endif +}; + +class SplitStore +{ + public: + + SplitStore(StaticCompressor *compressor, CommitStore *commits, int resource); + + ~SplitStore(); + + Split *getFirstSplit() const + { + if (splits_ -> size() > 0) + { + return (*(splits_ -> begin())); + } + + return NULL; + } + + Split *getLastSplit() const + { + if (splits_ -> size() > 0) + { + return (*(--(splits_ -> end()))); + } + + return NULL; + } + + int getNodeSize(const Split *split) const + { + // + // Take in account 64 bytes of overhead + // for each node. + // + + return (sizeof(class Split) + 64 + + split -> i_size_ + split -> d_size_); + } + + int getStorageSize() + { + return splitStorageSize_; + } + + static int getTotalSize() + { + return totalSplitSize_; + } + + static int getTotalStorageSize() + { + return totalSplitStorageSize_; + } + + int getResource() + { + return resource_; + } + + int getSize() + { + return splits_ -> size(); + } + + T_splits *getSplits() + { + return splits_; + } + + // + // Used, respectively, at the encoding + // and decoding side. + // + + Split *add(MessageStore *store, int resource, T_split_mode mode, + int position, T_store_action action, T_checksum checksum, + const unsigned char *buffer, const int size); + + Split *add(MessageStore *store, int resource, int position, + T_store_action action, T_checksum checksum, + unsigned char *buffer, const int size); + + // + // Handle the streaming of the message data. + // + + int send(EncodeBuffer &encodeBuffer, int packetSize); + + int receive(DecodeBuffer &decodeBuffer); + + // + // Remove the top element of the split store + // and update the storage size. + // + + void remove(Split *split); + + // + // Load the message from disk and replace the + // message in the store with the new copy. + // + + int load(Split *split); + + // + // Save the data to disk after the message has + // been recomposed at the local side. + // + + int save(Split *split); + + // + // Find the message on disk and update the last + // modification time. This is currently unused. + // + + int find(Split *split); + + // + // Remove the element on top of the queue and + // discard any split data that still needs to + // be transferred. + // + + Split *pop(); + + // + // Dump the content of the store. + // + + void dump(); + + protected: + + // + // Repository where to add the splits. + // + + T_splits *splits_; + + // + // Compress and decompress the data payload. + // + + StaticCompressor *compressor_; + + private: + + int start(EncodeBuffer &encodeBuffer); + + int start(DecodeBuffer &decodeBuffer); + + void push(Split *split); + + // + // Determine the name of the file object based + // on the checksum. + // + + const char *name(const T_checksum checksum); + + // + // The number of elements and data bytes + // in the repository. + // + + int splitStorageSize_; + + static int totalSplitSize_; + static int totalSplitStorageSize_; + + // + // Current element being transferred. + // + + T_splits::iterator current_; + + // + // Repository where to move the splits + // after they are completely recomposed. + // + + CommitStore *commits_; + + // + // Index in the client store or none, + // if this is a commit store. + // + + int resource_; + + #ifdef REFERENCES + + static int references_; + + #endif +}; + +class CommitStore : public SplitStore +{ + // + // This is just a split store. + // + + public: + + CommitStore(StaticCompressor *compressor) + + : SplitStore(compressor, NULL, nothing) + { + } + + // + // Move identity and data of the split to the + // provided buffer, uncompressing the message, + // if needed. + // + + int expand(Split *split, unsigned char *buffer, const int size); + + // + // We recomposed the data part. If the message + // was originally added to the message store, + // replace the data and/or update the size. + // + + int update(Split *split); + + // + // Remove the split from the commit queue. + // + + Split *pop(); + + // + // This is just used for debug. It checks + // if any message in the message store has + // an invalid number of locks. + // + + int validate(Split *split); +}; + +#endif /* Split_H */ |