diff options
Diffstat (limited to 'nxcomp/Split.cpp')
-rw-r--r-- | nxcomp/Split.cpp | 1835 |
1 files changed, 1835 insertions, 0 deletions
diff --git a/nxcomp/Split.cpp b/nxcomp/Split.cpp new file mode 100644 index 000000000..b6828c7cb --- /dev/null +++ b/nxcomp/Split.cpp @@ -0,0 +1,1835 @@ +/**************************************************************************/ +/* */ +/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */ +/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */ +/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */ +/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */ +/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/ +/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */ +/* */ +/* NXCOMP, NX protocol compression and NX extensions to this software */ +/* are copyright of the aforementioned persons and companies. */ +/* */ +/* Redistribution and use of the present software is allowed according */ +/* to terms specified in the file LICENSE.nxcomp which comes in the */ +/* source distribution. */ +/* */ +/* All rights reserved. */ +/* */ +/* NOTE: This software has received contributions from various other */ +/* contributors, only the core maintainers and supporters are listed as */ +/* copyright holders. Please contact us, if you feel you should be listed */ +/* as copyright holder, as well. */ +/* */ +/**************************************************************************/ + +#include <unistd.h> +#include <cstring> +#include <sys/stat.h> +#include <sys/types.h> +#include <utime.h> + +#include "Misc.h" + +#include "Split.h" + +#include "Control.h" +#include "Statistics.h" + +#include "EncodeBuffer.h" +#include "DecodeBuffer.h" + +#include "StaticCompressor.h" + +#include "Unpack.h" + +// +// Set the verbosity level. +// + +#define PANIC +#define WARNING +#undef TEST +#undef DEBUG +#undef DUMP + +// +// Define this to trace elements +// allocated and deallocated. +// + +#undef REFERENCES + +// +// Counters used for store control. +// + +int SplitStore::totalSplitSize_; +int SplitStore::totalSplitStorageSize_; + +// +// This is used for reference count. +// + +#ifdef REFERENCES + +int Split::references_ = 0; + +#endif + +Split::Split() +{ + resource_ = nothing; + position_ = nothing; + + store_ = NULL; + + d_size_ = 0; + i_size_ = 0; + c_size_ = 0; + r_size_ = 0; + + next_ = 0; + load_ = 0; + save_ = 0; + + checksum_ = NULL; + state_ = split_undefined; + mode_ = split_none; + action_ = is_discarded; + + #ifdef REFERENCES + + references_++; + + *logofs << "Split: Created new Split at " + << this << " out of " << references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +Split::~Split() +{ + delete [] checksum_; + + #ifdef REFERENCES + + references_--; + + *logofs << "Split: Deleted Split at " + << this << " out of " << references_ + << " allocated references.\n" << logofs_flush; + #endif +} + +SplitStore::SplitStore(StaticCompressor *compressor, CommitStore *commits, int resource) + + : compressor_(compressor), commits_(commits), resource_(resource) +{ + splits_ = new T_splits(); + + current_ = splits_ -> end(); + + splitStorageSize_ = 0; + + #ifdef TEST + *logofs << "SplitStore: Created new store ["; + + if (resource_ != nothing) + { + *logofs << resource_; + } + else + { + *logofs << "commit"; + } + + *logofs << "].\n" << logofs_flush; + + *logofs << "SplitStore: Total messages in stores are " + << totalSplitSize_ << " with total storage size " + << totalSplitStorageSize_ << ".\n" + << logofs_flush; + #endif +} + +SplitStore::~SplitStore() +{ + totalSplitSize_ -= splits_ -> size(); + + totalSplitStorageSize_ -= splitStorageSize_; + + for (T_splits::iterator i = splits_ -> begin(); + i != splits_ -> end(); i++) + { + delete *i; + } + + delete splits_; + + #ifdef TEST + *logofs << "SplitStore: Deleted store ["; + + if (resource_ != nothing) + { + *logofs << resource_; + } + else + { + *logofs << "commit"; + } + + *logofs << "] with storage size " << splitStorageSize_ + << ".\n" << logofs_flush; + + *logofs << "SplitStore: Total messages in stores are " + << totalSplitSize_ << " with total storage size " + << totalSplitStorageSize_ << ".\n" + << logofs_flush; + #endif +} + +// +// This is called at the encoding side. +// + +Split *SplitStore::add(MessageStore *store, int resource, T_split_mode mode, + int position, T_store_action action, T_checksum checksum, + const unsigned char *buffer, const int size) +{ + #ifdef TEST + *logofs << "SplitStore: Adding message [" << (unsigned int) store -> + opcode() << "] resource " << resource << " mode " << mode + << " position " << position << " action [" << DumpAction(action) + << "] and checksum [" << DumpChecksum(checksum) << "]" + << ".\n" << logofs_flush; + #endif + + Split *split = new Split(); + + if (split == NULL) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Can't allocate " + << "memory for the split.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Can't allocate memory " + << "for the split.\n"; + + HandleAbort(); + } + + split -> store_ = store; + split -> resource_ = resource; + split -> mode_ = mode; + split -> position_ = position; + split -> action_ = action; + + split -> store_ -> validateSize(size); + + // + // The checksum is not provided if the + // message is cached. + // + + if (checksum != NULL) + { + split -> checksum_ = new md5_byte_t[MD5_LENGTH]; + + memcpy(split -> checksum_, checksum, MD5_LENGTH); + } + + // + // We don't need the identity data at the + // encoding side. This qualifies the split + // as a split generated at the encoding + // side. + // + + split -> i_size_ = store -> identitySize(buffer, size); + + split -> d_size_ = size - split -> i_size_; + + if (action == IS_ADDED || action == is_discarded) + { + // + // If the message was added to message + // store or discarded we need to save + // the real data so we can transfer it + // at later time. + // + + split -> data_.resize(split -> d_size_); + + memcpy(split -> data_.begin(), buffer + split -> i_size_, split -> d_size_); + + // + // If the message was added, lock it so + // it will not be used by the encoding + // side until it is recomposed. + // + + if (action == IS_ADDED) + { + split -> store_ -> lock(split -> position_); + + #ifdef TEST + + commits_ -> validate(split); + + #endif + } + } + #ifdef WARNING + else + { + *logofs << "SplitStore: WARNING! Not copying data for the cached message.\n" + << logofs_flush; + } + #endif + + push(split); + + return split; +} + +// +// This is called at decoding side. If checksum +// is provided, the message can be searched on +// disk, then, if message is found, an event is +// sent to abort the data transfer. +// + +Split *SplitStore::add(MessageStore *store, int resource, int position, + T_store_action action, T_checksum checksum, + unsigned char *buffer, const int size) +{ + #ifdef TEST + *logofs << "SplitStore: Adding message [" + << (unsigned int) store -> opcode() << "] resource " + << resource << " position " << position << " action [" + << DumpAction(action) << "] and checksum [" + << DumpChecksum(checksum) << "].\n" << logofs_flush; + #endif + + Split *split = new Split(); + + if (split == NULL) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Can't allocate " + << "memory for the split.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Can't allocate memory " + << "for the split.\n"; + + HandleAbort(); + } + + split -> store_ = store; + split -> resource_ = resource; + split -> position_ = position; + split -> action_ = action; + + split -> store_ -> validateSize(size); + + // + // Check if the checksum was provided + // by the remote. + // + + if (checksum != NULL) + { + split -> checksum_ = new md5_byte_t[MD5_LENGTH]; + + memcpy(split -> checksum_, checksum, MD5_LENGTH); + } + + split -> i_size_ = store -> identitySize(buffer, size); + + // + // Copy the identity so we can expand the + // message when it is committed. + // + + split -> identity_.resize(split -> i_size_); + + memcpy(split -> identity_.begin(), buffer, split -> i_size_); + + split -> d_size_ = size - split -> i_size_; + + if (action == IS_ADDED || action == is_discarded) + { + // + // The unpack procedure will check if the + // first 2 bytes of the buffer contain the + // pattern and will not try to expand the + // image. + // + + split -> data_.resize(2); + + unsigned char *data = split -> data_.begin(); + + data[0] = SPLIT_PATTERN; + data[1] = SPLIT_PATTERN; + + // + // If the message was added to the store, + // we don't have the data part, yet, so + // we need to lock the message until it + // is recomposed. + // + + if (action == IS_ADDED) + { + split -> store_ -> lock(split -> position_); + + #ifdef TEST + + commits_ -> validate(split); + + #endif + } + } + else + { + #ifdef WARNING + *logofs << "SplitStore: WARNING! Copying data for the cached message.\n" + << logofs_flush; + #endif + + // + // We may optionally take the data from the + // message store in compressed form, but, + // as the data has been decompressed in the + // buffer, we save a further decompression. + // + + split -> data_.resize(split -> d_size_); + + memcpy(split -> data_.begin(), buffer + split -> i_size_, split -> d_size_); + } + + push(split); + + return split; +} + +void SplitStore::push(Split *split) +{ + splits_ -> push_back(split); + + splitStorageSize_ += getNodeSize(split); + + totalSplitSize_++; + + totalSplitStorageSize_ += getNodeSize(split); + + statistics -> addSplit(); + + #ifdef TEST + *logofs << "SplitStore: There are " << splits_ -> size() + << " messages in store [" << resource_ << "] with " + << "storage size " << splitStorageSize_ << ".\n" + << logofs_flush; + + *logofs << "SplitStore: Total messages in stores are " + << totalSplitSize_ << " with total storage size " + << totalSplitStorageSize_ << ".\n" + << logofs_flush; + #endif + + split -> state_ = split_added; +} + +void SplitStore::dump() +{ + #ifdef DUMP + + int n; + + Split *split; + + *logofs << "SplitStore: DUMP! Dumping content of "; + + if (commits_ == NULL) + { + *logofs << "[commits]"; + } + else + { + *logofs << "[splits] for store [" << resource_ << "]"; + } + + *logofs << " with [" << getSize() << "] elements " + << "in the store.\n" << logofs_flush; + + n = 0; + + for (T_splits::iterator i = splits_ -> begin(); i != splits_ -> end(); i++, n++) + { + split = *i; + + *logofs << "SplitStore: DUMP! Split [" << n << "] has action [" + << DumpAction(split -> action_) << "] state [" + << DumpState(split -> state_) << "] "; + + if (split -> resource_ >= 0) + { + *logofs << "resource " << split -> resource_; + } + + *logofs << " request " << (unsigned) split -> store_ -> opcode() + << " position " << split -> position_ << " size is " + << split -> data_.size() << " (" << split -> d_size_ + << "/" << split -> c_size_ << "/" << split -> r_size_ + << ") with " << split -> data_.size() - split -> next_ + << "] bytes to go.\n" << logofs_flush; + } + + #endif +} + +int SplitStore::send(EncodeBuffer &encodeBuffer, int packetSize) +{ + if (splits_ -> size() == 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Function send called with no splits available.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Function send called with no splits available.\n"; + + HandleAbort(); + } + + // + // A start operation must always be executed on + // the split, even in the case the split will be + // later aborted. + // + + if (current_ == splits_ -> end()) + { + start(encodeBuffer); + } + + // + // If we have matched the checksum received from + // the remote side then we must abort the current + // split, else we can send another block of data + // to the remote peer. + // + + Split *split = *current_; + + unsigned int abort = 0; + + if (split -> state_ == split_loaded) + { + abort = 1; + } + + encodeBuffer.encodeBoolValue(abort); + + if (abort == 1) + { + #ifdef TEST + *logofs << "SplitStore: Aborting split for checksum [" + << DumpChecksum(split -> checksum_) << "] position " + << split -> position_ << " with " << (split -> + data_.size() - split -> next_) << " bytes to go " + << "out of " << split -> data_.size() + << ".\n" << logofs_flush; + #endif + + statistics -> addSplitAborted(); + + statistics -> addSplitAbortedBytesOut(split -> data_.size() - split -> next_); + + split -> next_ = split -> data_.size(); + + split -> state_ = split_aborted; + } + else + { + int count = (packetSize <= 0 || split -> next_ + + packetSize > (int) split -> data_.size() ? + split -> data_.size() - split -> next_ : packetSize); + + #ifdef TEST + *logofs << "SplitStore: Sending split for checksum [" + << DumpChecksum(split -> checksum_) << "] count " + << count << " position " << split -> position_ + << ". Data size is " << split -> data_.size() << " (" + << split -> d_size_ << "/" << split -> c_size_ << "), " + << split -> data_.size() - (split -> next_ + count) + << " to go.\n" << logofs_flush; + #endif + + encodeBuffer.encodeValue(count, 32, 10); + + encodeBuffer.encodeMemory(split -> data_.begin() + split -> next_, count); + + split -> next_ += count; + } + + // + // Was data completely transferred? We are the + // sending side. We must update the message in + // store, even if split was aborted. + // + + if (split -> next_ != ((int) split -> data_.size())) + { + return 0; + } + + // + // Move the split at the head of the + // list to the commits. + // + + remove(split); + + // + // Reset current position to the + // end of repository. + // + + current_ = splits_ -> end(); + + #ifdef TEST + *logofs << "SplitStore: Removed split at head of the list. " + << "Resource is " << split -> resource_ << " request " + << (unsigned) split -> store_ -> opcode() << " position " + << split -> position_ << ".\n" << logofs_flush; + #endif + + return 1; +} + +int SplitStore::start(EncodeBuffer &encodeBuffer) +{ + // + // Get the element at the top of the + // list. + // + + current_ = splits_ -> begin(); + + Split *split = *current_; + + #ifdef TEST + *logofs << "SplitStore: Starting split for checksum [" + << DumpChecksum(split -> checksum_) << "] position " + << split -> position_ << " with " << (split -> + data_.size() - split -> next_) << " bytes to go " + << "out of " << split -> data_.size() + << ".\n" << logofs_flush; + #endif + + // + // See if compression of the data part is + // enabled. + // + + if (split -> store_ -> enableCompress) + { + // + // If the split is going to be aborted don't + // compress the data and go straight to the + // send. The new data size will be assumed + // from the disk cache. + // + + if (split -> state_ != split_loaded) + { + unsigned int compressedSize = 0; + unsigned char *compressedData = NULL; + + if (control -> LocalDataCompression && + (compressor_ -> compressBuffer(split -> data_.begin(), split -> d_size_, + compressedData, compressedSize))) + { + // + // Replace the data with the one in + // compressed form. + // + + #ifdef TEST + *logofs << "SplitStore: Split data of size " << split -> d_size_ + << " has been compressed to " << compressedSize + << " bytes.\n" << logofs_flush; + #endif + + split -> data_.clear(); + + split -> data_.resize(compressedSize); + + memcpy(split -> data_.begin(), compressedData, compressedSize); + + split -> c_size_ = compressedSize; + + // + // Inform our peer that the data is + // compressed and send the new size. + // + + encodeBuffer.encodeBoolValue(1); + + encodeBuffer.encodeValue(compressedSize, 32, 14); + + #ifdef TEST + *logofs << "SplitStore: Signaled " << split -> c_size_ + << " bytes of compressed data for this message.\n" + << logofs_flush; + #endif + + return 1; + } + } + #ifdef TEST + else + { + *logofs << "SplitStore: Not trying to compress the " + << "loaded message.\n" << logofs_flush; + } + #endif + + // + // Tell to the remote that data will + // follow uncompressed. + // + + encodeBuffer.encodeBoolValue(0); + } + + return 1; +} + +int SplitStore::start(DecodeBuffer &decodeBuffer) +{ + #ifdef TEST + *logofs << "SplitStore: Going to receive a new split from the remote side.\n" + << logofs_flush; + #endif + + // + // Get the element at the head + // of the list. + // + + current_ = splits_ -> begin(); + + Split *split = *current_; + + unsigned int compressedSize = 0; + + // + // Save the data size known by the remote. + // This information will be needed if the + // remote will not have a chance to abort + // the split. + // + + split -> r_size_ = split -> d_size_; + + // + // Find out if data was compressed by the + // remote. + // + + if (split -> store_ -> enableCompress) + { + decodeBuffer.decodeBoolValue(compressedSize); + + if (compressedSize == 1) + { + // + // Get the compressed size. + // + + // Since ProtoStep7 (#issue 108) + decodeBuffer.decodeValue(compressedSize, 32, 14); + + split -> store_ -> validateSize(split -> d_size_, compressedSize); + + split -> r_size_ = compressedSize; + } + } + + // + // Update the size if the split + // was not already loaded. + // + + if (split -> state_ != split_loaded) + { + split -> data_.clear(); + + if (compressedSize > 0) + { + split -> c_size_ = compressedSize; + + #ifdef TEST + *logofs << "SplitStore: Split data of size " + << split -> d_size_ << " was compressed to " + << split -> c_size_ << " bytes.\n" + << logofs_flush; + #endif + + split -> data_.resize(split -> c_size_); + } + else + { + split -> data_.resize(split -> d_size_); + } + + unsigned char *data = split -> data_.begin(); + + data[0] = SPLIT_PATTERN; + data[1] = SPLIT_PATTERN; + } + #ifdef TEST + else + { + // + // The message had been already + // loaded from disk. + // + + if (compressedSize > 0) + { + if ((int) compressedSize != split -> c_size_) + { + *logofs << "SplitStore: WARNING! Compressed data size is " + << "different than the loaded compressed size.\n" + << logofs_flush; + } + + *logofs << "SplitStore: Ignoring the new size with " + << "loaded compressed size " << split -> c_size_ + << ".\n" << logofs_flush; + } + } + #endif + + return 1; +} + +int SplitStore::receive(DecodeBuffer &decodeBuffer) +{ + if (splits_ -> size() == 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Function receive called with no splits available.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Function receive called with no splits available.\n"; + + HandleAbort(); + } + + if (current_ == splits_ -> end()) + { + start(decodeBuffer); + } + + // + // Check first if split was aborted, else add + // any new data to message being recomposed. + // + + Split *split = *current_; + + unsigned int abort = 0; + + decodeBuffer.decodeBoolValue(abort); + + if (abort == 1) + { + #ifdef TEST + *logofs << "SplitStore: Aborting split for checksum [" + << DumpChecksum(split -> checksum_) << "] position " + << split -> position_ << " with " << (split -> + data_.size() - split -> next_) << " bytes to go " + << "out of " << split -> data_.size() + << ".\n" << logofs_flush; + #endif + + statistics -> addSplitAborted(); + + statistics -> addSplitAbortedBytesOut(split -> r_size_ - split -> next_); + + split -> next_ = split -> r_size_; + + split -> state_ = split_aborted; + } + else + { + // + // Get the size of the packet. + // + + unsigned int count; + + decodeBuffer.decodeValue(count, 32, 10); + + // + // If the split was not already loaded from + // disk, decode the packet and update our + // copy of the data. The encoding side may + // have not received the abort event, yet, + // and may be unaware that the message is + // stored in compressed form at our side. + // + + #ifdef TEST + *logofs << "SplitStore: Receiving split for checksum [" + << DumpChecksum(split -> checksum_) << "] count " + << count << " position " << split -> position_ + << ". Data size is " << split -> data_.size() << " (" + << split -> d_size_ << "/" << split -> c_size_ << "/" + << split -> r_size_ << "), " << split -> r_size_ - + (split -> next_ + count) << " to go.\n" + << logofs_flush; + #endif + + if (split -> next_ + count > (unsigned) split -> r_size_) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Invalid data count " + << count << "provided in the split.\n" + << logofs_flush; + + *logofs << "SplitStore: PANIC! While receiving split for " + << "checksum [" << DumpChecksum(split -> checksum_) + << "] with count " << count << " action [" + << DumpAction(split -> action_) << "] state [" + << DumpState(split -> state_) << "]. Data size is " + << split -> data_.size() << " (" << split -> d_size_ + << "/" << split -> c_size_ << "), " << split -> + data_.size() - (split -> next_ + count) + << " to go.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Invalid data count " + << count << "provided in the split.\n"; + + HandleAbort(); + } + + if (split -> state_ != split_loaded) + { + #ifdef TEST + + if (split -> next_ + count > split -> data_.size()) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Inconsistent split data size " + << split -> data_.size() << " with expected size " + << split -> r_size_ << ".\n" + << logofs_flush; + #endif + + HandleAbort(); + } + + #endif + + memcpy(split -> data_.begin() + split -> next_, + decodeBuffer.decodeMemory(count), count); + } + else + { + #ifdef TEST + *logofs << "SplitStore: WARNING! Data discarded with split " + << "loaded from disk.\n" << logofs_flush; + #endif + + decodeBuffer.decodeMemory(count); + } + + split -> next_ += count; + } + + // + // Is unsplit complete? + // + + if (split -> next_ != split -> r_size_) + { + return 0; + } + + // + // If the persistent cache is enabled, + // we have a valid checksum and the + // split was not originally retrieved + // from disk, save the message on disk. + // + + if (split -> state_ != split_loaded && + split -> state_ != split_aborted) + { + save(split); + } + + // + // Move the split at the head of the + // list to the commits. + // + + remove(split); + + // + // Reset the current position to the + // end of the repository. + // + + current_ = splits_ -> end(); + + #ifdef TEST + *logofs << "SplitStore: Removed split at head of the list. " + << "Resource is " << split -> resource_ << " request " + << (unsigned) split -> store_ -> opcode() << " position " + << split -> position_ << ".\n" << logofs_flush; + #endif + + return 1; +} + +Split *SplitStore::pop() +{ + if (splits_ -> size() == 0) + { + #ifdef TEST + *logofs << "SplitStore: The split store is empty.\n" + << logofs_flush; + #endif + + return NULL; + } + + // + // Move the pointer at the end of the list. + // The next send operation will eventually + // start a new split. + // + + current_ = splits_ -> end(); + + Split *split = *(splits_ -> begin()); + + splits_ -> pop_front(); + + #ifdef TEST + *logofs << "SplitStore: Removed split at the head of the " + << "list with resource " << split -> resource_ + << " request " << (unsigned) split -> store_ -> + opcode() << " position " << split -> position_ + << ".\n" << logofs_flush; + #endif + + splitStorageSize_ -= getNodeSize(split); + + totalSplitSize_--; + + totalSplitStorageSize_ -= getNodeSize(split); + + #ifdef TEST + *logofs << "SplitStore: There are " << splits_ -> size() + << " messages in store [" << resource_ << "] with " + << "storage size " << splitStorageSize_ << ".\n" + << logofs_flush; + + *logofs << "SplitStore: Total messages in stores are " + << totalSplitSize_ << " with total storage size " + << totalSplitStorageSize_ << ".\n" + << logofs_flush; + #endif + + return split; +} + +void SplitStore::remove(Split *split) +{ + #ifdef TEST + *logofs << "SplitStore: Going to remove the split from the list.\n" + << logofs_flush; + #endif + + #ifdef TEST + + if (split != getFirstSplit()) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Trying to remove a split " + << "not at the head of the list.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Trying to remove a split " + << "not at the head of the list.\n"; + + HandleAbort(); + } + + #endif + + // + // Move the split to the commit store. + // + + splits_ -> pop_front(); + + commits_ -> splits_ -> push_back(split); + + splitStorageSize_ -= getNodeSize(split); + + totalSplitSize_--; + + totalSplitStorageSize_ -= getNodeSize(split); + + #ifdef TEST + *logofs << "SplitStore: There are " << splits_ -> size() + << " messages in store [" << resource_ << "] with " + << "storage size " << splitStorageSize_ << ".\n" + << logofs_flush; + + *logofs << "SplitStore: Total messages in stores are " + << totalSplitSize_ << " with total storage size " + << totalSplitStorageSize_ << ".\n" + << logofs_flush; + #endif + + #ifdef TEST + + if (splits_ -> size() == 0) + { + if (splitStorageSize_ != 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Internal error calculating " + << "split data size. It is " << splitStorageSize_ + << " while should be 0.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Internal error calculating " + << "split data size. It is " << splitStorageSize_ + << " while should be 0.\n"; + + HandleAbort(); + } + } + + #endif +} + +const char *SplitStore::name(const T_checksum checksum) +{ + if (checksum == NULL) + { + return NULL; + } + + char *pathName = control -> ImageCachePath; + + if (pathName == NULL) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot determine directory of " + << "NX image files.\n" << logofs_flush; + #endif + + return NULL; + } + + int pathSize = strlen(pathName); + + // + // File name is "[path][/I-c/I-][checksum][\0]", + // where c is the first hex digit of checksum. + // + + int nameSize = pathSize + 7 + MD5_LENGTH * 2 + 1; + + char *fileName = new char[nameSize]; + + if (fileName == NULL) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot allocate space for " + << "NX image file name.\n" << logofs_flush; + #endif + + return NULL; + } + + strcpy(fileName, pathName); + + sprintf(fileName + pathSize, "/I-%1X/I-", + *((unsigned char *) checksum) >> 4); + + for (unsigned int i = 0; i < MD5_LENGTH; i++) + { + sprintf(fileName + pathSize + 7 + (i * 2), "%02X", + ((unsigned char *) checksum)[i]); + } + + return fileName; +} + +int SplitStore::save(Split *split) +{ + // + // Check if saving the message on the + // persistent cache is enabled. + // + + if (split -> save_ == 0) + { + return 0; + } + + T_checksum checksum = split -> checksum_; + + const char *fileName = name(checksum); + + if (fileName == NULL) + { + return 0; + } + + unsigned int splitSize; + + ostream *fileStream = NULL; + + unsigned char *fileHeader = NULL; + + // + // Get the other data from the split. + // + + unsigned char opcode = split -> store_ -> opcode(); + + unsigned char *data = split -> data_.begin(); + + int dataSize = split -> d_size_; + int compressedSize = split -> c_size_; + + #ifdef DEBUG + *logofs << "SplitStore: Going to save split OPCODE#" + << (unsigned int) opcode << " to file '" << fileName + << "' with size " << dataSize << " and compressed size " + << compressedSize << ".\n" << logofs_flush; + #endif + + DisableSignals(); + + // + // Change the mask to make the file only + // readable by the user, then restore the + // old mask. + // + + mode_t fileMode; + + // + // Check if the file already exists. We try to + // load the message when the split is started + // and save it only if it is not found. Still + // the remote side may send the same image mul- + // tiple time and we may not have the time to + // notify the abort. + // + + struct stat fileStat; + + if (stat(fileName, &fileStat) == 0) + { + #ifdef TEST + *logofs << "SplitStore: Image file '" << fileName + << "' already present on disk.\n" + << logofs_flush; + #endif + + goto SplitStoreSaveError; + } + + fileMode = umask(0077); + + fileStream = new ofstream(fileName, ios::out | ios::binary); + + umask(fileMode); + + if (CheckData(fileStream) < 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot open file '" << fileName + << "' for output.\n" << logofs_flush; + #endif + + goto SplitStoreSaveError; + } + + fileHeader = new unsigned char[SPLIT_HEADER_SIZE]; + + if (fileHeader == NULL) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot allocate space for " + << "NX image header.\n" << logofs_flush; + #endif + + goto SplitStoreSaveError; + } + + // + // Leave 3 bytes for future use. Please note + // that, on some CPUs, we can't use PutULONG() + // to write integers that are not aligned to + // the word boundary. + // + + *fileHeader = opcode; + + *(fileHeader + 1) = 0; + *(fileHeader + 2) = 0; + *(fileHeader + 3) = 0; + + PutULONG(dataSize, fileHeader + 4, false); + PutULONG(compressedSize, fileHeader + 8, false); + + splitSize = (compressedSize > 0 ? compressedSize : dataSize); + + if (PutData(fileStream, fileHeader, SPLIT_HEADER_SIZE) < 0 || + PutData(fileStream, data, splitSize) < 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot write to NX " + << "image file '" << fileName << "'.\n" + << logofs_flush; + #endif + + goto SplitStoreSaveError; + } + + // + // Check if all the data was written on the + // disk and, if not, remove the faulty copy. + // + + FlushData(fileStream); + + if (CheckData(fileStream) < 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Failed to write NX " + << "image file '" << fileName << "'.\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Failed to write NX " + << "image file '" << fileName << "'.\n"; + + goto SplitStoreSaveError; + } + + #ifdef TEST + *logofs << "SplitStore: Saved split to file '" << fileName + << "' with data size " << dataSize << " and " + << "compressed data size " << compressedSize + << ".\n" << logofs_flush; + #endif + + delete fileStream; + + delete [] fileName; + delete [] fileHeader; + + EnableSignals(); + + // + // Update the timestamp as the operation + // may have taken some time. + // + + getNewTimestamp(); + + return 1; + +SplitStoreSaveError: + + delete fileStream; + + if (fileName != NULL) + { + unlink(fileName); + } + + delete [] fileName; + delete [] fileHeader; + + EnableSignals(); + + return -1; +} + +int SplitStore::find(Split *split) +{ + const char *fileName = name(split -> checksum_); + + if (fileName == NULL) + { + return 0; + } + + #ifdef DEBUG + *logofs << "SplitStore: Going to find split OPCODE#" + << (unsigned) split -> store_ -> opcode() + << " in file '" << fileName << "'.\n" + << logofs_flush; + #endif + + // + // Check if the file exists and, at the + // same time, update the modification + // time to prevent its deletion. + // + + if (utime(fileName, NULL) == 0) + { + #ifdef TEST + *logofs << "SplitStore: Found split OPCODE#" + << (unsigned) split -> store_ -> opcode() + << " in file '" << fileName << "'.\n" + << logofs_flush; + #endif + + delete [] fileName; + + return 1; + } + + #ifdef TEST + *logofs << "SplitStore: WARNING! Can't find split " + << "OPCODE#" << (unsigned) split -> store_ -> + opcode() << " in file '" << fileName + << "'.\n" << logofs_flush; + #endif + + delete [] fileName; + + return 0; +} + +int SplitStore::load(Split *split) +{ + // + // Check if loading the image is enabled. + // + + if (split -> load_ == 0) + { + return 0; + } + + const char *fileName = name(split -> checksum_); + + if (fileName == NULL) + { + return 0; + } + + unsigned char fileOpcode; + + int fileSize; + int fileCSize; + + istream *fileStream = NULL; + + unsigned char *fileHeader = NULL; + + DisableSignals(); + + #ifdef DEBUG + *logofs << "SplitStore: Going to load split OPCODE#" + << (unsigned int) split -> store_ -> opcode() + << " from file '" << fileName << "'.\n" + << logofs_flush; + #endif + + fileStream = new ifstream(fileName, ios::in | ios::binary); + + if (CheckData(fileStream) < 0) + { + #ifdef TEST + *logofs << "SplitStore: WARNING! Can't open image file '" + << fileName << "' on disk.\n" << logofs_flush; + #endif + + goto SplitStoreLoadError; + } + + fileHeader = new unsigned char[SPLIT_HEADER_SIZE]; + + if (fileHeader == NULL) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot allocate space for " + << "NX image header.\n" << logofs_flush; + #endif + + cerr << "Error" << ": Cannot allocate space for " + << "NX image header.\n"; + + goto SplitStoreLoadError; + } + + if (GetData(fileStream, fileHeader, SPLIT_HEADER_SIZE) < 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot read header from " + << "NX image file '" << fileName << "'.\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Cannot read header from " + << "NX image file '" << fileName << "'.\n"; + + goto SplitStoreLoadError; + } + + fileOpcode = *fileHeader; + + fileSize = GetULONG(fileHeader + 4, false); + fileCSize = GetULONG(fileHeader + 8, false); + + // + // Don't complain if we find that data was saved + // in compressed form even if we were not aware + // of the compressed data size. The remote side + // compresses the data only at the time it starts + // the transferral of the split. We replace our + // copy of the data with whatever we find on the + // disk. + // + + if (fileOpcode != split -> store_ -> opcode() || + fileSize != split -> d_size_ || + fileSize > control -> MaximumRequestSize || + fileCSize > control -> MaximumRequestSize) + + { + #ifdef TEST + *logofs << "SplitStore: PANIC! Corrupted image file '" << fileName + << "'. Expected " << (unsigned int) split -> store_ -> opcode() + << "/" << split -> d_size_ << "/" << split -> c_size_ << " found " + << (unsigned int) fileOpcode << "/" << fileSize << "/" + << fileCSize << ".\n" << logofs_flush; + #endif + + cerr << "Warning" << ": Corrupted image file '" << fileName + << "'. Expected " << (unsigned int) split -> store_ -> opcode() + << "/" << split -> d_size_ << "/" << split -> c_size_ << " found " + << (unsigned int) fileOpcode << "/" << fileSize << "/" + << fileCSize << ".\n"; + + goto SplitStoreLoadError; + } + + // + // Update the data size with the size + // we got from the disk record. + // + + split -> d_size_ = fileSize; + split -> c_size_ = fileCSize; + + unsigned int splitSize; + + if (fileCSize > 0) + { + splitSize = fileCSize; + } + else + { + splitSize = fileSize; + } + + // + // Allocate a new buffer if we didn't + // do that already or if the size is + // different. + // + + if (split -> data_.size() != splitSize) + { + split -> data_.clear(); + + split -> data_.resize(splitSize); + } + + if (GetData(fileStream, split -> data_.begin(), splitSize) < 0) + { + #ifdef PANIC + *logofs << "SplitStore: PANIC! Cannot read data from " + << "NX image file '" << fileName << "'.\n" + << logofs_flush; + #endif + + cerr << "Warning" << ": Cannot read data from " + << "NX image file '" << fileName << "'.\n"; + + goto SplitStoreLoadError; + } + + delete fileStream; + + delete [] fileHeader; + delete [] fileName; + + EnableSignals(); + + // + // Update the timestamp as the operation + // may have taken some time. + // + + getNewTimestamp(); + + return 1; + +SplitStoreLoadError: + + delete fileStream; + + unlink(fileName); + + delete [] fileName; + delete [] fileHeader; + + EnableSignals(); + + return -1; +} + +Split *CommitStore::pop() +{ + if (splits_ -> size() == 0) + { + #ifdef TEST + *logofs << "CommitStore: The commit store is empty.\n" + << logofs_flush; + #endif + + return NULL; + } + + Split *split = *(splits_ -> begin()); + + splits_ -> pop_front(); + + #ifdef TEST + *logofs << "CommitStore: Removed commit split at the head " + << "of the list with resource " << split -> resource_ + << " request " << (unsigned) split -> store_ -> + opcode() << " position " << split -> position_ + << ".\n" << logofs_flush; + #endif + + return split; +} + +int CommitStore::expand(Split *split, unsigned char *buffer, const int size) +{ + #ifdef TEST + *logofs << "CommitStore: Expanding split data with " + << size << " bytes to write.\n" + << logofs_flush; + #endif + + #ifdef TEST + + if (size < split -> i_size_ + split -> d_size_) + { + #ifdef PANIC + *logofs << "CommitStore: PANIC! Wrong size of the provided " + << "buffer. It should be " << split -> i_size_ + + split -> d_size_ << " instead of " << size + << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Wrong size of the provided " + << "buffer. It should be " << split -> i_size_ + + split -> d_size_ << " instead of " << size + << ".\n"; + + HandleAbort(); + } + + #endif + + #ifdef DEBUG + *logofs << "CommitStore: Copying " << split -> i_size_ + << " bytes of identity.\n" << logofs_flush; + #endif + + memcpy(buffer, split -> identity_.begin(), split -> i_size_); + + // + // Copy data, if any, to the buffer. + // + + if (size > split -> i_size_) + { + // + // Check if message has been stored + // in compressed format. + // + + if (split -> c_size_ == 0) + { + #ifdef DEBUG + *logofs << "CommitStore: Copying " << split -> d_size_ + << " bytes of plain data.\n" << logofs_flush; + #endif + + memcpy(buffer + split -> i_size_, split -> data_.begin(), split -> d_size_); + } + else + { + #ifdef DEBUG + *logofs << "CommitStore: Decompressing " << split -> c_size_ + << " bytes and copying " << split -> d_size_ + << " bytes of data.\n" << logofs_flush; + #endif + + if (compressor_ -> + decompressBuffer(buffer + split -> i_size_, + split -> d_size_, split -> data_.begin(), + split -> c_size_) < 0) + { + #ifdef PANIC + *logofs << "CommitStore: PANIC! Split data decompression failed.\n" + << logofs_flush; + #endif + + cerr << "Error" << ": Split data decompression failed.\n"; + + return -1; + } + } + } + + return 1; +} + +int CommitStore::update(Split *split) +{ + if (split -> action_ != IS_ADDED) + { + return 0; + } + + // + // We don't need the identity data at + // the encoding side. + // + + if (split -> identity_.size() == 0) + { + #ifdef TEST + *logofs << "SplitStore: Going to update the size " + << "for object at position " << split -> position_ + << " with data size " << split -> d_size_ + << " and compressed data size " << split -> + c_size_ << ".\n" << logofs_flush; + #endif + + split -> store_ -> updateData(split -> position_, split -> d_size_, + split -> c_size_); + } + else + { + #ifdef TEST + *logofs << "SplitStore: Going to update data and size " + << "for object at position " << split -> position_ + << " with data size " << split -> d_size_ + << " and compressed data size " << split -> + c_size_ << ".\n" << logofs_flush; + #endif + + split -> store_ -> updateData(split -> position_, split -> data_.begin(), + split -> d_size_, split -> c_size_); + } + + // + // Unlock message so that we can remove + // or save it on disk at shutdown. + // + + if (split -> action_ == IS_ADDED) + { + split -> store_ -> unlock(split -> position_); + + #ifdef TEST + + validate(split); + + #endif + } + + return 1; +} + +int CommitStore::validate(Split *split) +{ + MessageStore *store = split -> store_; + + int p, n, s; + + s = store -> cacheSlots; + + for (p = 0, n = 0; p < s; p++) + { + if (store -> getLocks(p) == 1) + { + n++; + } + else if (store -> getLocks(p) != 0) + { + #ifdef PANIC + *logofs << "CommitStore: PANIC! Repository for OPCODE#" + << (unsigned int) store -> opcode() << " has " + << store -> getLocks(p) << " locks for message " + << "at position " << p << ".\n" << logofs_flush; + #endif + + cerr << "Error" << ": Repository for OPCODE#" + << (unsigned int) store -> opcode() << " has " + << store -> getLocks(p) << " locks for message " + << "at position " << p << ".\n"; + + HandleAbort(); + } + } + + #ifdef TEST + *logofs << "CommitStore: Repository for OPCODE#" + << (unsigned int) store -> opcode() + << " has " << n << " locked messages.\n" + << logofs_flush; + #endif + + return 1; +} |