aboutsummaryrefslogtreecommitdiff
path: root/nxcomp/src/Message.cpp
diff options
context:
space:
mode:
authorMike Gabriel <mike.gabriel@das-netzwerkteam.de>2017-06-30 20:13:51 +0200
committerMike Gabriel <mike.gabriel@das-netzwerkteam.de>2017-07-26 10:12:43 +0200
commitf76c82403888bb498973ec974dbfd20e4edb02fe (patch)
treebe0cb6c112d9d9fb46387fbd114727510197ddec /nxcomp/src/Message.cpp
parent9193d11eeeea933e293acd5e0f03fa4e9887186b (diff)
downloadnx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.tar.gz
nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.tar.bz2
nx-libs-f76c82403888bb498973ec974dbfd20e4edb02fe.zip
nxcomp: Switch to autoreconf.
Diffstat (limited to 'nxcomp/src/Message.cpp')
-rw-r--r--nxcomp/src/Message.cpp2343
1 files changed, 2343 insertions, 0 deletions
diff --git a/nxcomp/src/Message.cpp b/nxcomp/src/Message.cpp
new file mode 100644
index 000000000..b75d90c24
--- /dev/null
+++ b/nxcomp/src/Message.cpp
@@ -0,0 +1,2343 @@
+/**************************************************************************/
+/* */
+/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */
+/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */
+/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */
+/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */
+/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
+/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */
+/* */
+/* NXCOMP, NX protocol compression and NX extensions to this software */
+/* are copyright of the aforementioned persons and companies. */
+/* */
+/* Redistribution and use of the present software is allowed according */
+/* to terms specified in the file LICENSE.nxcomp which comes in the */
+/* source distribution. */
+/* */
+/* All rights reserved. */
+/* */
+/* NOTE: This software has received contributions from various other */
+/* contributors, only the core maintainers and supporters are listed as */
+/* copyright holders. Please contact us, if you feel you should be listed */
+/* as copyright holder, as well. */
+/* */
+/**************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <cstdio>
+#include <unistd.h>
+#include <cstring>
+
+#include <algorithm>
+
+#include "Misc.h"
+
+//
+// We need channel's cache data.
+//
+
+#include "Message.h"
+
+#include "EncodeBuffer.h"
+#include "DecodeBuffer.h"
+
+//
+// Set the verbosity level. You also
+// need to define DUMP in Misc.cpp
+// if DUMP is defined here.
+//
+
+#define PANIC
+#define WARNING
+#undef TEST
+#undef DEBUG
+#undef DUMP
+
+//
+// Define this to log when messages
+// are allocated and deallocated.
+//
+
+#undef REFERENCES
+
+//
+// Keep track of how many bytes are
+// occupied by cache.
+//
+
+int MessageStore::totalLocalStorageSize_ = 0;
+int MessageStore::totalRemoteStorageSize_ = 0;
+
+//
+// These are used for reference count.
+//
+
+#ifdef REFERENCES
+
+int Message::references_ = 0;
+int MessageStore::references_ = 0;
+
+#endif
+
+//
+// Here are the methods to handle cached messages.
+//
+
+MessageStore::MessageStore(StaticCompressor *compressor)
+
+ : compressor_(compressor)
+{
+ //
+ // Public members.
+ //
+
+ enableCache = MESSAGE_ENABLE_CACHE;
+ enableData = MESSAGE_ENABLE_DATA;
+ enableSplit = MESSAGE_ENABLE_SPLIT;
+ enableCompress = MESSAGE_ENABLE_COMPRESS;
+
+ dataLimit = MESSAGE_DATA_LIMIT;
+ dataOffset = MESSAGE_DATA_OFFSET;
+
+ cacheSlots = MESSAGE_CACHE_SLOTS;
+ cacheThreshold = MESSAGE_CACHE_THRESHOLD;
+ cacheLowerThreshold = MESSAGE_CACHE_LOWER_THRESHOLD;
+
+ #ifdef TEST
+ *logofs << "MessageStore: Static compressor is at "
+ << compressor_ << ".\n" << logofs_flush;
+ #endif
+
+ md5_state_ = new md5_state_t();
+
+ #ifdef DEBUG
+ *logofs << "MessageStore: Created MD5 state for object at "
+ << this << ".\n" << logofs_flush;
+ #endif
+
+ lastAdded = cacheSlots;
+ lastHit = 0;
+ lastRemoved = 0;
+ lastRated = nothing;
+ lastAction = is_discarded;
+
+ //
+ // Private members.
+ //
+
+ localStorageSize_ = 0;
+ remoteStorageSize_ = 0;
+
+ #ifdef TEST
+ *logofs << "MessageStore: Size of total cache is "
+ << totalLocalStorageSize_ << " bytes at local side and "
+ << totalRemoteStorageSize_ << " bytes at remote side.\n"
+ << logofs_flush;
+ #endif
+
+ messages_ = new T_messages();
+ checksums_ = new T_checksums();
+
+ temporary_ = NULL;
+
+ #ifdef REFERENCES
+
+ references_++;
+
+ *logofs << "MessageStore: Created new store at "
+ << this << "out of " << references_
+ << " allocated stores.\n" << logofs_flush;
+
+ #endif
+}
+
+MessageStore::~MessageStore()
+{
+ //
+ // The virtual destructor of specialized class
+ // must get rid of both messages in container
+ // and temporary.
+ //
+
+ #ifdef DEBUG
+ *logofs << "MessageStore: Deleting MD5 state for object at "
+ << this << ".\n" << logofs_flush;
+ #endif
+
+ delete md5_state_;
+
+ delete messages_;
+ delete checksums_;
+
+ //
+ // Update the static members tracking
+ // size of total memory allocated for
+ // all stores.
+ //
+
+ totalLocalStorageSize_ -= localStorageSize_;
+ totalRemoteStorageSize_ -= remoteStorageSize_;
+
+ #ifdef TEST
+ *logofs << "MessageStore: Size of total cache is "
+ << totalLocalStorageSize_ << " bytes at local side and "
+ << totalRemoteStorageSize_ << " bytes at remote side.\n"
+ << logofs_flush;
+ #endif
+
+ #ifdef REFERENCES
+
+ references_--;
+
+ *logofs << "MessageStore: Deleted store at "
+ << this << " out of " << references_
+ << " allocated stores.\n" << logofs_flush;
+
+ #endif
+}
+
+//
+// Here are the methods to parse and cache
+// messages in the message stores.
+//
+
+int MessageStore::parse(Message *message, int split, const unsigned char *buffer,
+ unsigned int size, T_checksum_action checksumAction,
+ T_data_action dataAction, int bigEndian)
+{
+ //
+ // Save the message size as received on the link.
+ // This information will be used to create an ap-
+ // propriate buffer at the time the message will
+ // be unparsed.
+ //
+
+ message -> size_ = size;
+ message -> i_size_ = identitySize(buffer, size);
+ message -> c_size_ = 0;
+
+ validateSize(size);
+
+ if (checksumAction == use_checksum)
+ {
+ beginChecksum(message);
+
+ parseIdentity(message, buffer, size, bigEndian);
+
+ identityChecksum(message, buffer, size, bigEndian);
+
+ parseData(message, split, buffer, size, checksumAction, dataAction, bigEndian);
+
+ endChecksum(message);
+ }
+ else
+ {
+ parseIdentity(message, buffer, size, bigEndian);
+
+ parseData(message, split, buffer, size, checksumAction, dataAction, bigEndian);
+ }
+
+ return 1;
+}
+
+int MessageStore::parse(Message *message, const unsigned char *buffer,
+ unsigned int size, const unsigned char *compressedData,
+ const unsigned int compressedDataSize,
+ T_checksum_action checksumAction,
+ T_data_action dataAction, int bigEndian)
+{
+ int offset = identitySize(buffer, size);
+
+ message -> size_ = size;
+ message -> i_size_ = offset;
+ message -> c_size_ = compressedDataSize + offset;
+
+ validateSize(message -> size_ - offset, compressedDataSize);
+
+ if (checksumAction == use_checksum)
+ {
+ beginChecksum(message);
+
+ parseIdentity(message, buffer, size, bigEndian);
+
+ identityChecksum(message, buffer, size, bigEndian);
+
+ parseData(message, buffer, size, compressedData, compressedDataSize,
+ checksumAction, dataAction, bigEndian);
+
+ endChecksum(message);
+ }
+ else
+ {
+ parseIdentity(message, buffer, size, bigEndian);
+
+ parseData(message, buffer, size, compressedData, compressedDataSize,
+ checksumAction, dataAction, bigEndian);
+ }
+
+ return 1;
+}
+
+int MessageStore::parseData(Message *message, int split, const unsigned char *buffer,
+ unsigned int size, T_checksum_action checksumAction,
+ T_data_action dataAction, int bigEndian)
+{
+ if ((int) size > message -> i_size_)
+ {
+ unsigned int dataSize = size - message -> i_size_;
+
+ if (checksumAction == use_checksum)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Calculating checksum of object at "
+ << message << " with data size " << dataSize
+ << ".\n" << logofs_flush;
+ #endif
+
+ dataChecksum(message, buffer, size, bigEndian);
+ }
+
+ if (dataAction == discard_data)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Discarded " << dataSize
+ << " bytes of plain data. Real size is "
+ << message -> size_ << " compressed size is "
+ << message -> c_size_ << ".\n" << logofs_flush;
+ #endif
+
+ return 1;
+ }
+
+ //
+ // Accept anyway data beyond the
+ // expected limit.
+ //
+
+ #ifdef TEST
+
+ if (dataSize > (unsigned int) dataLimit)
+ {
+ *logofs << name() << ": WARNING! Data is " << dataSize
+ << " bytes. Ignoring the established limit.\n"
+ << logofs_flush;
+ }
+
+ #endif
+
+ if (dataSize != message -> data_.size())
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Data will be resized from "
+ << message -> data_.size() << " to hold a plain buffer of "
+ << dataSize << " bytes.\n" << logofs_flush;
+ #endif
+
+ message -> data_.clear();
+
+ message -> data_.resize(dataSize);
+ }
+
+ if (split == 0)
+ {
+ memcpy(message -> data_.begin(), buffer + message -> i_size_, dataSize);
+ }
+ #ifdef TEST
+ else
+ {
+ *logofs << name() << ": Not copied " << dataSize
+ << " bytes of fake data for the split message.\n"
+ << logofs_flush;
+ }
+ #endif
+
+ #ifdef DEBUG
+ *logofs << name() << ": Parsed " << dataSize
+ << " bytes of plain data. Real size is "
+ << message -> size_ << " compressed size is "
+ << message -> c_size_ << ".\n" << logofs_flush;
+ #endif
+ }
+
+ return 1;
+}
+
+//
+// Store the data part in compressed format.
+//
+
+int MessageStore::parseData(Message *message, const unsigned char *buffer,
+ unsigned int size, const unsigned char *compressedData,
+ const unsigned int compressedDataSize,
+ T_checksum_action checksumAction,
+ T_data_action dataAction, int bigEndian)
+{
+ if ((int) size > message -> i_size_)
+ {
+ unsigned int dataSize = size - message -> i_size_;
+
+ if (checksumAction == use_checksum)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Calculating checksum of object at "
+ << message << " with data size " << dataSize
+ << ".\n" << logofs_flush;
+ #endif
+
+ dataChecksum(message, buffer, size, bigEndian);
+ }
+
+ if (dataAction == discard_data)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Discarded " << dataSize
+ << " bytes of compressed data. Real size is "
+ << message -> size_ << " compressed size is "
+ << message -> c_size_ << ".\n" << logofs_flush;
+ #endif
+
+ return 1;
+ }
+
+ #ifdef WARNING
+ if (dataSize > (unsigned int) dataLimit)
+ {
+ *logofs << name() << ": WARNING! Data is " << dataSize
+ << " bytes. Ignoring the established limit!\n"
+ << logofs_flush;
+ }
+ #endif
+
+ dataSize = compressedDataSize;
+
+ if (dataSize != message -> data_.size())
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Data will be resized from "
+ << message -> data_.size() << " to hold a compressed buffer of "
+ << dataSize << " bytes.\n" << logofs_flush;
+ #endif
+
+ message -> data_.clear();
+
+ message -> data_.resize(compressedDataSize);
+ }
+
+ memcpy(message -> data_.begin(), compressedData, compressedDataSize);
+
+ #ifdef DEBUG
+ *logofs << name() << ": Parsed " << dataSize
+ << " bytes of compressed data. Real size is "
+ << message -> size_ << " compressed size is "
+ << message -> c_size_ << ".\n" << logofs_flush;
+ #endif
+ }
+
+ return 1;
+}
+
+int MessageStore::unparseData(const Message *message, unsigned char *buffer,
+ unsigned int size, int bigEndian)
+{
+ //
+ // Copy data, if any, to the buffer.
+ //
+
+ if ((int) size > message -> i_size_)
+ {
+ //
+ // Check if message has been stored
+ // in compressed format.
+ //
+
+ if (message -> c_size_ == 0)
+ {
+ memcpy(buffer + message -> i_size_, message -> data_.begin(), size - message -> i_size_);
+
+ #ifdef DEBUG
+ *logofs << name() << ": Unparsed " << message -> size_ - message -> i_size_
+ << " bytes of data to a buffer of " << message -> size_ - message -> i_size_
+ << ".\n" << logofs_flush;
+ #endif
+ }
+ else
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Using static compressor at " << (void *) compressor_
+ << ".\n" << logofs_flush;
+ #endif
+
+ if (compressor_ ->
+ decompressBuffer(buffer + message -> i_size_,
+ size - message -> i_size_,
+ message -> data_.begin(),
+ message -> c_size_ - message -> i_size_) < 0)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Data decompression failed.\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Data decompression failed.\n";
+
+ return -1;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Unparsed " << message -> c_size_ - message -> i_size_
+ << " bytes of compressed data to a buffer of "
+ << message -> size_ - message -> i_size_ << ".\n" << logofs_flush;
+ #endif
+ }
+ }
+
+ //
+ // We could write size to the buffer but this
+ // is something the channel class is doing by
+ // itself.
+ //
+ // PutUINT(size >> 2, buffer + 2, bigEndian);
+ //
+
+ return 1;
+}
+
+void MessageStore::dumpData(const Message *message) const
+{
+ #ifdef DUMP
+
+ *logofs << name() << ": Dumping enumerated data:\n" << logofs_flush;
+
+ DumpData(message -> data_.begin(), message -> data_.size());
+
+ #endif
+
+ #ifdef DUMP
+
+ *logofs << name() << ": Dumping checksum data:\n" << logofs_flush;
+
+ DumpData(message -> md5_digest_, MD5_LENGTH);
+
+ #endif
+}
+
+T_checksum MessageStore::getChecksum(const unsigned char *buffer,
+ unsigned int size, int bigEndian)
+{
+ Message *message = getTemporary();
+
+ message -> size_ = size;
+ message -> i_size_ = identitySize(buffer, size);
+ message -> c_size_ = 0;
+
+ validateSize(size);
+
+ beginChecksum(message);
+
+ //
+ // We don't need to extract the identity
+ // data from the buffer.
+ //
+ // parseIdentity(message, buffer, size, bigEndian);
+ //
+
+ identityChecksum(message, buffer, size, bigEndian);
+
+ parseData(message, 0, buffer, size, use_checksum, discard_data, bigEndian);
+
+ endChecksum(message);
+
+ //
+ // The caller will have to explicitly
+ // deallocated the memory after use.
+ //
+
+ T_checksum checksum = new md5_byte_t[MD5_LENGTH];
+
+ memcpy(checksum, message -> md5_digest_, MD5_LENGTH);
+
+ return checksum;
+}
+
+int MessageStore::clean(T_checksum_action checksumAction)
+{
+ int position = lastRemoved + 1;
+
+ if (position >= cacheSlots)
+ {
+ position = 0;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Searching a message to remove "
+ << "starting at position " << position
+ << " with " << checksums_ -> size()
+ << " elements in cache.\n"
+ << logofs_flush;
+ #endif
+
+ while (position != lastRemoved)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Examining position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ if ((*messages_)[position] != NULL)
+ {
+ if (getRating((*messages_)[position], rating_for_clean) == 0)
+ {
+ break;
+ }
+ else
+ {
+ untouch((*messages_)[position]);
+ }
+ }
+
+ if (++position == cacheSlots)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Rolled position at "
+ << strMsTimestamp() << ".\n"
+ << logofs_flush;
+ #endif
+
+ position = 0;
+ }
+ }
+
+ //
+ // If no message is a good candidate,
+ // then try the object at the next slot
+ // in respect to last element removed.
+ //
+
+ if (position == lastRemoved)
+ {
+ position = lastRemoved + 1;
+
+ if (position >= cacheSlots)
+ {
+ position = 0;
+ }
+
+ if ((*messages_)[position] == NULL ||
+ (*messages_)[position] -> locks_ != 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": WARNING! No message found "
+ << "to be actually removed.\n"
+ << logofs_flush;
+ #endif
+
+ return nothing;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": WARNING! Assuming object "
+ << "at position " << position << ".\n"
+ << logofs_flush;
+ #endif
+ }
+
+ return position;
+}
+
+//
+// This is the insertion method used at local side
+// side. Cache at remote side side will be kept in
+// sync by telling the to other party where to
+// store the message.
+//
+
+int MessageStore::findOrAdd(Message *message, T_checksum_action checksumAction,
+ T_data_action dataAction, int &added, int &locked)
+{
+ if (checksumAction != use_checksum)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Internal error in context [A]. "
+ << "Cannot find or add message to repository "
+ << "without using checksum.\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Internal error in context [A]. "
+ << "Cannot find or add message to repository "
+ << "without using checksum.\n";
+
+ HandleAbort();
+ }
+
+ //
+ // Set added to true only if message
+ // is inserted in cache.
+ //
+
+ added = 0;
+ locked = 0;
+
+ //
+ // First of all figure out where to
+ // store this object.
+ //
+
+ #ifdef DEBUG
+ *logofs << name() << ": Searching an empty slot "
+ << "with last rated " << lastRated << " and "
+ << "last added " << lastAdded << ".\n"
+ << logofs_flush;
+ #endif
+
+ int position = lastRated;
+
+ if (position == nothing)
+ {
+ position = lastAdded + 1;
+
+ if (position >= cacheSlots)
+ {
+ position = 0;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Searching an empty slot "
+ << "starting at position " << position
+ << " with " << checksums_ -> size()
+ << " elements in cache.\n"
+ << logofs_flush;
+ #endif
+
+ while (position != lastAdded)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Examining position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ if ((*messages_)[position] == NULL)
+ {
+ break;
+ }
+ else if (getRating((*messages_)[position], rating_for_insert) == 0)
+ {
+ break;
+ }
+ else
+ {
+ untouch((*messages_)[position]);
+ }
+
+ if (++position == cacheSlots)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Rolled position at "
+ << strMsTimestamp() << ".\n"
+ << logofs_flush;
+ #endif
+
+ position = 0;
+ }
+ }
+ }
+ #ifdef DEBUG
+ else
+ {
+ *logofs << name() << ": Using last rated position "
+ << position << ".\n" << logofs_flush;
+ }
+ #endif
+
+ //
+ // If we made an extensive check but did not
+ // find neither a free slot or a message to
+ // replace, assume slot at next position in
+ // respect to last added. This can happen if
+ // all objects in repository have got an hit
+ // recently.
+ //
+
+ if (position == lastAdded)
+ {
+ position = lastAdded + 1;
+
+ if (position >= cacheSlots)
+ {
+ position = 0;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": WARNING! Assuming slot "
+ << "at position " << position << ".\n"
+ << logofs_flush;
+ #endif
+ }
+ #ifdef DEBUG
+ else
+ {
+ *logofs << name() << ": Found candidate slot "
+ << "at position " << position << ".\n"
+ << logofs_flush;
+ }
+ #endif
+
+ //
+ // Save the search result so if the message
+ // is found in cache, we can use the slot
+ // at next run.
+ //
+
+ lastRated = position;
+
+ if ((*messages_)[position] != NULL &&
+ (*messages_)[position] -> locks_ != 0)
+ {
+ #ifdef WARNING
+ *logofs << name() << ": WARNING! Insertion at position "
+ << position << " would replace a locked message. "
+ << "Forcing channel to discard the message.\n"
+ << logofs_flush;
+ #endif
+
+ #ifdef TEST
+ *logofs << name() << ": Invalidating rating of object "
+ << "at position " << position << ".\n"
+ << logofs_flush;
+ #endif
+
+ return (lastRated = nothing);
+ }
+
+ if (checksumAction == use_checksum)
+ {
+ T_checksum checksum = getChecksum(message);
+
+ #ifdef TEST
+ *logofs << name() << ": Searching checksum ["
+ << DumpChecksum(checksum) << "] in repository.\n"
+ << logofs_flush;
+
+ #endif
+
+ pair<T_checksums::iterator, bool> result;
+
+ result = checksums_ -> insert(T_checksums::value_type(checksum, position));
+
+ //
+ // Message was found in cache or
+ // insertion couldn't take place.
+ //
+
+ if (result.second == 0)
+ {
+ if (result.first == checksums_ -> end())
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Failed to insert object "
+ << "in the cache.\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Failed to insert object of type "
+ << name() << " in the cache.\n";
+
+ return nothing;
+ }
+
+ //
+ // Message is in cache.
+ //
+
+ #ifdef TEST
+ *logofs << name() << ": Object is already in cache "
+ << "at position " << (result.first) -> second
+ << ".\n" << logofs_flush;
+ #endif
+
+ #ifdef DEBUG
+
+ printStorageSize();
+
+ #endif
+
+ //
+ // Message is locked, probably because
+ // it has not completely recomposed at
+ // remote side after a split.
+ //
+
+ if ((*messages_)[(result.first) -> second] -> locks_ != 0)
+ {
+ #ifdef TEST
+ *logofs << name() << ": WARNING! Object at position "
+ << (result.first) -> second << " is locked.\n"
+ << logofs_flush;
+ #endif
+
+ locked = 1;
+ }
+
+ //
+ // Object got a hit, so prevent
+ // its removal.
+ //
+
+ if (lastRated == (result.first) -> second)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Resetting rating of object "
+ << "at position " << (result.first) -> second
+ << ".\n" << logofs_flush;
+ #endif
+
+ lastRated = nothing;
+ }
+
+ return (result.first) -> second;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Could not find message in cache.\n"
+ << logofs_flush;
+ #endif
+ }
+
+ //
+ // Message not found in hash table (or insertion
+ // of checksum in hash table was not requested).
+ // Message was added to cache.
+ //
+
+ added = 1;
+
+ //
+ // Log data about the missed message.
+ //
+
+ #ifdef TEST
+
+ if (opcode() == X_PutImage || opcode() == X_NXPutPackedImage)
+ {
+ #ifdef WARNING
+ *logofs << name() << ": WARNING! Dumping identity of "
+ << "missed image object of type " << name()
+ << ".\n" << logofs_flush;
+ #endif
+
+ dumpIdentity(message);
+ }
+
+ #endif
+
+ if ((*messages_)[position] != NULL)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": The message replaces "
+ << "the old one at position " << position
+ << ".\n" << logofs_flush;
+ #endif
+
+ remove(position, checksumAction, dataAction);
+ }
+
+ (*messages_)[position] = message;
+
+ //
+ // We used the slot. Perform a new
+ // search at next run.
+ //
+
+ lastRated = nothing;
+
+ #ifdef TEST
+ *logofs << name() << ": Stored message object of size "
+ << plainSize(position) << " (" << message -> size_
+ << "/" << message -> c_size_ << ") at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ unsigned int localSize;
+ unsigned int remoteSize;
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ += localSize;
+ remoteStorageSize_ += remoteSize;
+
+ totalLocalStorageSize_ += localSize;
+ totalRemoteStorageSize_ += remoteSize;
+
+ #ifdef DEBUG
+
+ printStorageSize();
+
+ #endif
+
+ //
+ // Set hits and timestamp at insertion in cache.
+ //
+
+ message -> hits_ = control -> StoreHitsAddBonus;
+ message -> last_ = (getTimestamp()).tv_sec;
+
+ message -> locks_ = 0;
+
+ #ifdef DEBUG
+ *logofs << name() << ": Set last hit of object at "
+ << strMsTimestamp() << " with a bonus of "
+ << message -> hits_ << ".\n" << logofs_flush;
+ #endif
+
+ return position;
+}
+
+//
+// Add a parsed message to repository. It is normally used
+// at decoding side or at encoding side when we load store
+// from disk. To handle messages coming from network, the
+// encoding side uses the optimized method findOrAdd().
+//
+
+int MessageStore::add(Message *message, const int position,
+ T_checksum_action checksumAction, T_data_action dataAction)
+{
+ if (position < 0 || position >= cacheSlots)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Cannot add a message "
+ << "at non existing position " << position
+ << ".\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Cannot add a message "
+ << "at non existing position " << position
+ << ".\n";
+
+ HandleAbort();
+ }
+
+ if ((*messages_)[position] != NULL)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": The message will replace "
+ << "the old one at position " << position
+ << ".\n" << logofs_flush;
+ #endif
+
+ remove(position, checksumAction, dataAction);
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Inserting object in repository at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ (*messages_)[position] = message;
+
+ //
+ // Get the object's checksum value
+ // and insert it in the table.
+ //
+
+ if (checksumAction == use_checksum)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Inserting object's checksum in repository.\n";
+ #endif
+
+ T_checksum checksum = getChecksum(message);
+
+ checksums_ -> insert(T_checksums::value_type(checksum, position));
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Stored message object of size "
+ << plainSize(position) << " (" << message -> size_
+ << "/" << message -> c_size_ << ") at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ unsigned int localSize;
+ unsigned int remoteSize;
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ += localSize;
+ remoteStorageSize_ += remoteSize;
+
+ totalLocalStorageSize_ += localSize;
+ totalRemoteStorageSize_ += remoteSize;
+
+ #ifdef DEBUG
+
+ printStorageSize();
+
+ #endif
+
+ //
+ // Set hits and timestamp at insertion in cache.
+ //
+
+ message -> hits_ = control -> StoreHitsAddBonus;
+ message -> last_ = (getTimestamp()).tv_sec;
+
+ message -> locks_ = 0;
+
+ #ifdef DEBUG
+ *logofs << name() << ": Set last hit of object at "
+ << strMsTimestamp() << " with a bonus of "
+ << message -> hits_ << ".\n" << logofs_flush;
+ #endif
+
+ return position;
+}
+
+//
+// The following functions don't modify data,
+// so they are supposed to be called only at
+// the encoding side.
+//
+
+void MessageStore::updateData(const int position, unsigned int dataSize,
+ unsigned int compressedDataSize)
+{
+ Message *message = (*messages_)[position];
+
+ validateSize(dataSize, compressedDataSize);
+
+ if (compressedDataSize != 0)
+ {
+ unsigned int localSize;
+ unsigned int remoteSize;
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ -= localSize;
+ remoteStorageSize_ -= remoteSize;
+
+ totalLocalStorageSize_ -= localSize;
+ totalRemoteStorageSize_ -= remoteSize;
+
+ message -> c_size_ = compressedDataSize + message -> i_size_;
+
+ #ifdef TEST
+
+ if (message -> size_ != (int) (dataSize + message -> i_size_))
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Size of object looks "
+ << message -> size_ << " bytes while it "
+ << "should be " << dataSize + message -> i_size_
+ << ".\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Size of object looks "
+ << message -> size_ << " bytes while it "
+ << "should be " << dataSize + message -> i_size_
+ << ".\n";
+
+ HandleAbort();
+ }
+
+ #endif
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ += localSize;
+ remoteStorageSize_ += remoteSize;
+
+ totalLocalStorageSize_ += localSize;
+ totalRemoteStorageSize_ += remoteSize;
+
+ #ifdef DEBUG
+
+ printStorageSize();
+
+ #endif
+ }
+}
+
+void MessageStore::updateData(const T_checksum checksum, unsigned int compressedDataSize)
+{
+ #ifdef TEST
+ *logofs << name() << ": Searching checksum ["
+ << DumpChecksum(checksum) << "] in repository.\n"
+ << logofs_flush;
+ #endif
+
+ T_checksums::iterator found = checksums_ -> find(checksum);
+
+ if (found != checksums_ -> end())
+ {
+ Message *message = (*messages_)[found -> second];
+
+ #ifdef TEST
+ *logofs << name() << ": Message found in cache at "
+ << "position " << found -> second << " with size "
+ << message -> size_ << " and compressed size "
+ << message -> c_size_ << ".\n" << logofs_flush;
+ #endif
+
+ updateData(found -> second, message -> size_ -
+ message -> i_size_, compressedDataSize);
+ }
+ #ifdef TEST
+ else if (checksums_ -> size() > 0)
+ {
+ *logofs << name() << ": WARNING! Can't locate the "
+ << "checksum [" << DumpChecksum(checksum)
+ << "] for the update.\n" << logofs_flush;
+ }
+ #endif
+}
+
+//
+// This function replaces the data part of the message
+// and updates the information about its size. Split
+// messages are advertised to the decoding side with
+// their uncompressed size, data is then compressed
+// before sending the first chunk. This function is
+// called by the decoding side after the split message
+// is fully recomposed to replace the dummy data and
+// set the real size.
+//
+
+void MessageStore::updateData(const int position, const unsigned char *newData,
+ unsigned int dataSize, unsigned int compressedDataSize)
+{
+ Message *message = (*messages_)[position];
+
+ validateSize(dataSize, compressedDataSize);
+
+ #ifdef TEST
+
+ if (message -> size_ != (int) (dataSize + message -> i_size_))
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Data of object looks "
+ << dataSize << " bytes while it " << "should be "
+ << message -> size_ - message -> i_size_
+ << ".\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Data of object looks "
+ << dataSize << " bytes while it " << "should be "
+ << message -> size_ - message -> i_size_
+ << ".\n";
+
+ HandleAbort();
+ }
+
+ #endif
+
+ //
+ // A compressed data size of 0 means that
+ // message's data was not compressed.
+ //
+
+ if (compressedDataSize != 0)
+ {
+ unsigned int localSize;
+ unsigned int remoteSize;
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ -= localSize;
+ remoteStorageSize_ -= remoteSize;
+
+ totalLocalStorageSize_ -= localSize;
+ totalRemoteStorageSize_ -= remoteSize;
+
+ if (message -> c_size_ != (int) compressedDataSize +
+ message -> i_size_)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Resizing data of message at "
+ << "position " << position << " from " << message ->
+ c_size_ << " to " << compressedDataSize +
+ message -> i_size_ << " bytes.\n"
+ << logofs_flush;
+ #endif
+
+ message -> data_.clear();
+
+ message -> data_.resize(compressedDataSize);
+ }
+
+ memcpy(message -> data_.begin(), newData, compressedDataSize);
+
+ #ifdef TEST
+ *logofs << name() << ": Data of message at position "
+ << position << " has size " << message -> data_.size()
+ << " and capacity " << message -> data_.capacity()
+ << ".\n" << logofs_flush;
+ #endif
+
+ message -> c_size_ = compressedDataSize + message -> i_size_;
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ += localSize;
+ remoteStorageSize_ += remoteSize;
+
+ totalLocalStorageSize_ += localSize;
+ totalRemoteStorageSize_ += remoteSize;
+
+ #ifdef DEBUG
+
+ printStorageSize();
+
+ #endif
+ }
+ else
+ {
+ #ifdef TEST
+ *logofs << name() << ": No changes to data size for message "
+ << "at position " << position << ".\n" << logofs_flush;
+ #endif
+
+ memcpy(message -> data_.begin(), newData, dataSize);
+ }
+}
+
+int MessageStore::remove(const int position, T_checksum_action checksumAction,
+ T_data_action dataAction)
+{
+ Message *message;
+
+ if (position < 0 || position >= cacheSlots ||
+ (message = (*messages_)[position]) == NULL)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Cannot remove "
+ << "a non existing message at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Cannot remove "
+ << "a non existing message at position "
+ << position << ".\n";
+
+ HandleAbort();
+ }
+
+ #if defined(TEST) || defined(INFO)
+
+ if (opcode() == X_PutImage || opcode() == X_NXPutPackedImage)
+ {
+ #ifdef WARNING
+ *logofs << name() << ": WARNING! Discarding image object "
+ << "of type " << name() << " at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+ }
+
+ #endif
+
+ //
+ // The checksum is only stored at the encoding
+ // side.
+ //
+
+ if (checksumAction == use_checksum)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": Removing checksum for object at "
+ << "position " << position << ".\n" << logofs_flush;
+ #endif
+
+ //
+ // TODO: If we had stored the iterator and
+ // not the pointer to the message, we could
+ // have removed the message without having
+ // to look up the checksum.
+ //
+
+ T_checksum checksum = getChecksum(message);
+
+ #ifdef TEST
+ *logofs << name() << ": Searching checksum ["
+ << DumpChecksum(checksum) << "] in repository.\n"
+ << logofs_flush;
+ #endif
+
+ T_checksums::iterator found = checksums_ -> find(checksum);
+
+ if (found == checksums_ -> end())
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! No checksum found for "
+ << "object at position " << position << ".\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": No checksum found for "
+ << "object at position " << position << ".\n";
+
+ HandleAbort();
+ }
+
+ #ifdef TEST
+
+ else if (position != found -> second)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Value of position for object "
+ << "doesn't match position " << position << ".\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Value of position for object "
+ << "doesn't match position " << position << ".\n";
+
+ HandleAbort();
+ }
+
+ #endif
+
+ checksums_ -> erase(found);
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Removing message at position "
+ << position << " of size " << plainSize(position)
+ << " (" << message -> size_ << "/" << message -> c_size_
+ << ").\n" << logofs_flush;
+ #endif
+
+ unsigned int localSize;
+ unsigned int remoteSize;
+
+ storageSize(message, localSize, remoteSize);
+
+ localStorageSize_ -= localSize;
+ remoteStorageSize_ -= remoteSize;
+
+ totalLocalStorageSize_ -= localSize;
+ totalRemoteStorageSize_ -= remoteSize;
+
+ recycle(message);
+
+ (*messages_)[position] = NULL;
+
+ #ifdef DEBUG
+
+ printStorageSize();
+
+ #endif
+
+ return position;
+}
+
+//
+// This should only be called at encoding side.
+// The decoding side can't rely on the counter
+// as it is decremented by the encoding side
+// every time the repository is searched for a
+// message to be removed.
+//
+
+int MessageStore::getRating(Message *message, T_rating type) const
+{
+ if (message -> locks_ != 0)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Rate set to -1 as locks of object are "
+ << (int) message -> locks_ << ".\n"
+ << logofs_flush;
+ #endif
+
+ return -1;
+ }
+ else if ((type == rating_for_clean ||
+ (int) checksums_ -> size() == cacheSlots) &&
+ message -> hits_ <= control -> StoreHitsLoadBonus)
+ {
+ //
+ // We don't have any free slot or we exceeded the
+ // available storage size. This is likely to happen
+ // after having loaded objects from persistent cache.
+ // It's not a bad idea to discard some messages that
+ // were restored but never referenced.
+ //
+
+ #ifdef TEST
+
+ if (type == rating_for_clean)
+ {
+ *logofs << name() << ": Rate set to 0 with hits "
+ << message -> hits_ << " as maximum storage size "
+ << "was exceeded.\n" << logofs_flush;
+ }
+ else
+ {
+ *logofs << name() << ": Rate set to 0 with hits "
+ << message -> hits_ << " as there are no available "
+ << "slots in store.\n" << logofs_flush;
+ }
+
+ #endif
+
+ return 0;
+ }
+ else if (type == rating_for_clean &&
+ (getTimestamp()).tv_sec - message -> last_ >=
+ control -> StoreTimeLimit)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Rate set to 0 as last hit of object was "
+ << (getTimestamp()).tv_sec - message -> last_
+ << " seconds ago with limit set to " << control ->
+ StoreTimeLimit << ".\n" << logofs_flush;
+ #endif
+
+ return 0;
+ }
+ else
+ {
+ #ifdef TEST
+ if (message -> hits_ < 0)
+ {
+ *logofs << name() << ": PANIC! Rate of object shouldn't be "
+ << message -> hits_ << ".\n" << logofs_flush;
+
+ cerr << "Error" << ": Rate of object of type " << name()
+ << " shouldn't be " << message -> hits_ << ".\n";
+
+ HandleAbort();
+ }
+ #endif
+
+ #ifdef TEST
+ *logofs << name() << ": Rate of object is " << message -> hits_
+ << " with last hit " << (getTimestamp()).tv_sec -
+ message -> last_ << " seconds ago.\n"
+ << logofs_flush;
+ #endif
+
+ return message -> hits_;
+ }
+}
+
+int MessageStore::touch(Message *message) const
+{
+ message -> last_ = (getTimestamp()).tv_sec;
+
+ message -> hits_ += control -> StoreHitsTouch;
+
+ if (message -> hits_ > control -> StoreHitsLimit)
+ {
+ message -> hits_ = control -> StoreHitsLimit;
+ }
+
+ #ifdef TEST
+ *logofs << name() << ": Increased hits of object to "
+ << message -> hits_ << " at " << strMsTimestamp()
+ << ".\n" << logofs_flush;
+ #endif
+
+ return message -> hits_;
+}
+
+int MessageStore::untouch(Message *message) const
+{
+ message -> hits_ -= control -> StoreHitsUntouch;
+
+ if (message -> hits_ < 0)
+ {
+ message -> hits_ = 0;
+ }
+
+ #ifdef TEST
+ *logofs << name() << ": Decreased hits of object to "
+ << message -> hits_ << ".\n"
+ << logofs_flush;
+ #endif
+
+ return message -> hits_;
+}
+
+int MessageStore::lock(const int position) const
+{
+ Message *message = (*messages_)[position];
+
+ if (message == NULL)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Can't lock the null "
+ << "object at position " << position
+ << ".\n" << logofs_flush;
+ #endif
+
+ return -1;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Increasing locks of object to "
+ << (int) message -> locks_ + 1 << ".\n"
+ << logofs_flush;
+ #endif
+
+ return ++(message -> locks_);
+}
+
+int MessageStore::unlock(const int position) const
+{
+ Message *message = (*messages_)[position];
+
+ if (message == NULL)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Can't unlock the null "
+ << "object at position " << position
+ << ".\n" << logofs_flush;
+ #endif
+
+ return -1;
+ }
+
+ #ifdef DEBUG
+ *logofs << name() << ": Decreasing locks of object to "
+ << (int) message -> locks_ - 1 << ".\n"
+ << logofs_flush;
+ #endif
+
+ return --(message -> locks_);
+}
+
+int MessageStore::saveStore(ostream *cachefs, md5_state_t *md5StateStream,
+ md5_state_t *md5StateClient, T_checksum_action checksumAction,
+ T_data_action dataAction, int bigEndian)
+{
+ Message *message;
+
+ #ifdef TEST
+ *logofs << name() << ": Opcode of this store is "
+ << (unsigned int) opcode() << " default size of "
+ << "identity is " << dataOffset << ".\n"
+ << logofs_flush;
+ #endif
+
+ unsigned char *identityBuffer = new unsigned char[dataOffset];
+ unsigned char *sizeBuffer = new unsigned char[4 * 2];
+ unsigned char *positionBuffer = new unsigned char[4];
+ unsigned char *opcodeBuffer = new unsigned char[4];
+
+ #ifdef DUMP
+
+ char *md5ClientDump = new char[dataOffset * 2 + 128];
+
+ #endif
+
+ unsigned char value;
+
+ int offset;
+
+ int failed = 0;
+
+ for (int position = 0; position < cacheSlots; position++)
+ {
+ message = (*messages_)[position];
+
+ //
+ // Don't save split messages.
+ //
+
+ if (message != NULL && message -> locks_ == 0)
+ {
+ //
+ // Use the total size if offset is
+ // beyond the real end of message.
+ //
+
+ offset = dataOffset;
+
+ if (offset > message -> size_)
+ {
+ offset = message -> size_;
+ }
+
+ #ifdef TEST
+ *logofs << name() << ": Going to save message at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ value = 1;
+
+ PutULONG(position, positionBuffer, bigEndian);
+ PutULONG(opcode(), opcodeBuffer, bigEndian);
+
+ md5_append(md5StateClient, positionBuffer, 4);
+ md5_append(md5StateClient, opcodeBuffer, 4);
+
+ #ifdef DUMP
+
+ *logofs << "Name=" << name() << logofs_flush;
+
+ sprintf(md5ClientDump," Pos=%d Op=%d\n", position, opcode());
+
+ *logofs << md5ClientDump << logofs_flush;
+
+ #endif
+
+ if (PutData(cachefs, &value, 1) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure writing " << 1
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, &value, 1);
+
+ PutULONG(message -> size_, sizeBuffer, bigEndian);
+ PutULONG(message -> c_size_, sizeBuffer + 4, bigEndian);
+
+ //
+ // Note that the identity size is not saved with
+ // the message and will be determined from the
+ // data read when restoring the identity.
+ //
+
+ if (PutData(cachefs, sizeBuffer, 4 * 2) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure writing " << 4 * 2
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, sizeBuffer, 4 * 2);
+ md5_append(md5StateClient, sizeBuffer, 4 * 2);
+
+ #ifdef DUMP
+
+ sprintf(md5ClientDump, "size = %d c_size = %d\n",
+ message -> size_, message -> c_size_);
+
+ *logofs << md5ClientDump << logofs_flush;
+
+ #endif
+
+ //
+ // Prepare a clean buffer for unparse.
+ //
+
+ CleanData(identityBuffer, offset);
+
+ unparseIdentity(message, identityBuffer, offset, bigEndian);
+
+ if (PutData(cachefs, identityBuffer, offset) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure writing " << offset
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, identityBuffer, offset);
+ md5_append(md5StateClient, identityBuffer, offset);
+
+ #ifdef DUMP
+
+ for (int i = 0; i < offset; i++)
+ {
+ sprintf(md5ClientDump + (i * 2), "%02X", identityBuffer[i]);
+ }
+
+ *logofs << "Identity = " << md5ClientDump << "\n" << logofs_flush;
+
+ #endif
+
+ //
+ // Set the real identity size before
+ // saving the data.
+ //
+
+ offset = message -> i_size_;
+
+ if (offset > message -> size_)
+ {
+ offset = message -> size_;
+ }
+
+ if (checksumAction == use_checksum)
+ {
+ if (PutData(cachefs, message -> md5_digest_, MD5_LENGTH) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure writing " << MD5_LENGTH
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, message -> md5_digest_, MD5_LENGTH);
+ }
+ else if (dataAction == use_data)
+ {
+ int dataSize = (message -> c_size_ == 0 ?
+ message -> size_ - offset :
+ message -> c_size_ - offset);
+ if (dataSize > 0)
+ {
+ if (PutData(cachefs, message -> data_.begin(), dataSize) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure writing " << dataSize
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, message -> data_.begin(), dataSize);
+ }
+ }
+ }
+ else
+ {
+ #ifdef TEST
+ *logofs << name() << ": Not saving message at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ value = 0;
+
+ if (PutData(cachefs, &value, 1) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure writing " << 1
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, &value, 1);
+ }
+ }
+
+ if (failed == 1)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Write to persistent cache file failed.\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Write to persistent cache file failed.\n";
+ }
+
+ delete [] identityBuffer;
+ delete [] sizeBuffer;
+ delete [] positionBuffer;
+ delete [] opcodeBuffer;
+
+ #ifdef DUMP
+
+ delete [] md5ClientDump;
+
+ #endif
+
+ return (failed == 0 ? 1 : -1);
+}
+
+int MessageStore::loadStore(istream *cachefs, md5_state_t *md5StateStream,
+ T_checksum_action checksumAction, T_data_action dataAction,
+ int bigEndian)
+{
+ Message *message;
+
+ #ifdef TEST
+ *logofs << name() << ": Opcode of this store is "
+ << (unsigned int) opcode() << " default size of "
+ << "identity is " << dataOffset << " slots are "
+ << cacheSlots << ".\n" << logofs_flush;
+ #endif
+
+ //
+ // If packed images or the render extension has been
+ // disabled we don't need to restore these messages
+ // in the cache. Encoding of RENDER in 1.4.0 is also
+ // changed so we want to skip messages saved using
+ // the old format. We want to restore all the other
+ // messages so we'll need to skip these one by one.
+ //
+
+ int skip = 0;
+
+ if ((opcode() == X_NXPutPackedImage &&
+ control -> PersistentCacheLoadPacked == 0) ||
+ (opcode() == X_NXInternalRenderExtension &&
+ control -> PersistentCacheLoadRender == 0))
+ {
+ #ifdef TEST
+ *logofs << name() << ": All messages for OPCODE#"
+ << (unsigned int) opcode() << " will be discarded.\n"
+ << logofs_flush;
+ #endif
+
+ skip = 1;
+ }
+
+ unsigned char *identityBuffer = new unsigned char[dataOffset];
+ unsigned char *sizeBuffer = new unsigned char[4 * 2];
+
+ unsigned char value;
+
+ int offset;
+
+ int failed = 0;
+
+ for (int position = 0; position < cacheSlots; position++)
+ {
+ if (GetData(cachefs, &value, 1) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure reading " << 1
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, &value, 1);
+
+ if (value == 1)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Going to load message at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ if (GetData(cachefs, sizeBuffer, 4 * 2) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure reading " << 4 * 2
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, sizeBuffer, 4 * 2);
+
+ message = getTemporary();
+
+ if (message == NULL)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Can't access temporary storage "
+ << "for message in context [B].\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Can't access temporary storage "
+ << "for message in context [B].\n";
+
+ failed = 1;
+
+ break;
+ }
+
+ message -> size_ = GetULONG(sizeBuffer, bigEndian);
+ message -> c_size_ = GetULONG(sizeBuffer + 4, bigEndian);
+
+ #ifdef DEBUG
+ *logofs << name() << ": Size is " << message -> size_
+ << " compressed size is " << message -> c_size_
+ << ".\n" << logofs_flush;
+ #endif
+
+ //
+ // Use the total size if offset is
+ // beyond the real end of message.
+ //
+
+ offset = dataOffset;
+
+ if (offset > message -> size_)
+ {
+ offset = message -> size_;
+ }
+
+ if (GetData(cachefs, identityBuffer, offset) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure reading " << offset
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, identityBuffer, offset);
+
+ //
+ // Get the real identity size based on the value
+ // reported by the message store. The dataOffset
+ // value is guaranteed to be greater or equal to
+ // the maximum identity size of the messages in
+ // the major store.
+ //
+
+ offset = identitySize(identityBuffer, offset);
+
+ if (offset > message -> size_)
+ {
+ offset = message -> size_;
+ }
+
+ message -> i_size_ = offset;
+
+ //
+ // Get identity of message from the buffer we just
+ // created. Don't calculate neither checksum nor
+ // data, restore them from stream. Don't pass the
+ // message's size but the default size of identity.
+ //
+
+ parseIdentity(message, identityBuffer, offset, bigEndian);
+
+ if (checksumAction == use_checksum)
+ {
+ if (message -> md5_digest_ == NULL)
+ {
+ message -> md5_digest_ = new md5_byte_t[MD5_LENGTH];
+ }
+
+ if (GetData(cachefs, message -> md5_digest_, MD5_LENGTH) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure reading " << MD5_LENGTH
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ //
+ // Add message's checksum to checksum that will
+ // be saved together with this cache. Checksum
+ // will be verified when cache file is restored
+ // to ensure file is not corrupted.
+ //
+
+ md5_append(md5StateStream, message -> md5_digest_, MD5_LENGTH);
+
+ if (skip == 1)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Discarding message for OPCODE#"
+ << (unsigned int) opcode() << ".\n"
+ << logofs_flush;
+ #endif
+
+ continue;
+ }
+ }
+ else if (dataAction == use_data)
+ {
+ //
+ // Restore the data part.
+ //
+
+ int dataSize = (message -> c_size_ == 0 ?
+ message -> size_ - offset :
+ message -> c_size_ - offset);
+
+ if (dataSize < 0 || dataSize > control -> MaximumMessageSize)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Bad data size "
+ << dataSize << " loading persistent cache.\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Bad data size " << dataSize
+ << " loading persistent cache.\n";
+
+ failed = 1;
+
+ break;
+ }
+ else if (dataSize > 0)
+ {
+ //
+ // If need to skip the message let anyway
+ // it to be part of the calculated MD5.
+ //
+
+ if (skip == 1)
+ {
+ unsigned char *dummy = new unsigned char[dataSize];
+
+ if (dummy == NULL)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Can't allocate dummy buffer "
+ << "of size " << dataSize << " loading cache.\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Can't allocate dummy buffer "
+ << "of size " << dataSize << " loading cache.\n";
+
+ failed = 1;
+
+ break;
+ }
+
+ if (GetData(cachefs, dummy, dataSize) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure reading " << dataSize
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ md5_append(md5StateStream, dummy, dataSize);
+
+ delete [] dummy;
+
+ #ifdef TEST
+ *logofs << name() << ": Discarding message for OPCODE#"
+ << (unsigned int) opcode() << ".\n"
+ << logofs_flush;
+ #endif
+
+ continue;
+ }
+ else
+ {
+ message -> data_.clear();
+
+ message -> data_.resize(dataSize);
+
+ if (GetData(cachefs, message -> data_.begin(), dataSize) < 0)
+ {
+ #ifdef DEBUG
+ *logofs << name() << ": PANIC! Failure reading " << dataSize
+ << " bytes.\n" << logofs_flush;
+ #endif
+
+ failed = 1;
+
+ break;
+ }
+
+ //
+ // Add message's data to cache checksum.
+ //
+
+ md5_append(md5StateStream, message -> data_.begin(), dataSize);
+ }
+ }
+ else
+ {
+ //
+ // We are here if data part is zero.
+ //
+
+ if (skip == 1)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Discarding message for OPCODE#"
+ << (unsigned int) opcode() << ".\n"
+ << logofs_flush;
+ #endif
+
+ continue;
+ }
+ }
+ }
+
+ int added;
+
+ added = add(message, position, checksumAction, dataAction);
+
+ if (added != position)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Can't store message "
+ << "in the cache at position " << position
+ << ".\n" << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Can't store message "
+ << "in the cache at position " << position
+ << ".\n";
+
+ failed = 1;
+
+ break;
+ }
+ else
+ {
+ //
+ // Replace default value of hits set by add
+ // function. Messages read from cache start
+ // with a lower bonus than fresh messages
+ // inserted.
+ //
+
+ message -> hits_ = control -> StoreHitsLoadBonus;
+
+ #ifdef DEBUG
+ *logofs << name() << ": Updated last hit of object at "
+ << strMsTimestamp() << " with a bonus of "
+ << message -> hits_ << ".\n" << logofs_flush;
+ #endif
+
+ resetTemporary();
+ }
+ }
+ else if ((*messages_)[position] != NULL)
+ {
+ #ifdef TEST
+ *logofs << name() << ": Going to remove message at position "
+ << position << ".\n" << logofs_flush;
+ #endif
+
+ int removed;
+
+ removed = remove(position, checksumAction, dataAction);
+
+ if (removed != position)
+ {
+ #ifdef PANIC
+ *logofs << name() << ": PANIC! Can't remove message from cache "
+ << "at position " << position << ".\n"
+ << logofs_flush;
+ #endif
+
+ cerr << "Error" << ": Can't remove message from cache "
+ << "at position " << position << ".\n";
+
+ failed = 1;
+
+ break;
+ }
+ }
+ #ifdef TEST
+ else
+ {
+ *logofs << name() << ": Not loading message at position "
+ << position << ".\n" << logofs_flush;
+ }
+ #endif
+ }
+
+ #ifdef WARNING
+
+ if (failed == 1)
+ {
+ *logofs << name() << ": WARNING! Read from persistent cache file failed.\n"
+ << logofs_flush;
+ }
+
+ #endif
+
+ delete [] identityBuffer;
+ delete [] sizeBuffer;
+
+ return (failed == 0 ? 1 : -1);
+}
+
+void MessageStore::storageSize(const Message *message, unsigned int &local,
+ unsigned int &remote) const
+{
+ local = remote = storage();
+
+ //
+ // Encoding side includes 48 bytes for
+ // the map of checksums and 24 bytes
+ // of adjustment for total overhead.
+ //
+
+ local += MD5_LENGTH + 48 + 24;
+
+ //
+ // At decoding side we include size of
+ // data part and 24 bytes of adjustment
+ // for total overhead.
+ //
+
+ if (message -> c_size_ == 0)
+ {
+ remote += message -> size_ + 24;
+ }
+ else
+ {
+ remote += message -> c_size_ + 24;
+ }
+
+ //
+ // Check if we are the encoding or the
+ // decoding side and, if needed, swap
+ // the values.
+ //
+
+ if (message -> md5_digest_ == NULL)
+ {
+ unsigned int t = local;
+
+ local = remote;
+
+ remote = t;
+ }
+}
+
+void MessageStore::printStorageSize()
+{
+ #ifdef TEST
+
+ *logofs << name() << ": There are "
+ << checksums_ -> size() << " checksums in this store "
+ << "out of " << cacheSlots << " slots.\n"
+ << logofs_flush;
+
+ *logofs << name() << ": Size of this store is "
+ << localStorageSize_ << " bytes at local side and "
+ << remoteStorageSize_ << " bytes at remote side.\n"
+ << logofs_flush;
+
+ *logofs << name() << ": Size of total cache is "
+ << totalLocalStorageSize_ << " bytes at local side and "
+ << totalRemoteStorageSize_ << " bytes at remote side.\n"
+ << logofs_flush;
+
+ #endif
+}