aboutsummaryrefslogtreecommitdiff
path: root/nxcomp/src/Transport.h
blob: ba07e13d4304fdc4080de505ff68f254ec5aac56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
/**************************************************************************/
/*                                                                        */
/* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com)          */
/* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de>  */
/* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de>                 */
/* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de>                */
/* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
/* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com)           */
/*                                                                        */
/* NXCOMP, NX protocol compression and NX extensions to this software     */
/* are copyright of the aforementioned persons and companies.             */
/*                                                                        */
/* Redistribution and use of the present software is allowed according    */
/* to terms specified in the file LICENSE.nxcomp which comes in the       */
/* source distribution.                                                   */
/*                                                                        */
/* All rights reserved.                                                   */
/*                                                                        */
/* NOTE: This software has received contributions from various other      */
/* contributors, only the core maintainers and supporters are listed as   */
/* copyright holders. Please contact us, if you feel you should be listed */
/* as copyright holder, as well.                                          */
/*                                                                        */
/**************************************************************************/

#ifndef Transport_H
#define Transport_H

#include <zlib.h>
#include <errno.h>

#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/socket.h>

#include "Misc.h"
#include "Control.h"

#include "Types.h"
#include "Timestamp.h"
#include "Socket.h"

//
// Set the verbosity level.
//

#define PANIC
#define WARNING
#undef  TEST
#undef  DEBUG

//
// Define this to lock and unlock the
// memory-to-memory transport buffers
// before they are accessed. The code
// is outdated and doesn't work with
// the current pthread library.
//

#undef  THREADS

//
// Define this to know when a socket
// is created or destroyed.
//

#undef  REFERENCES

//
// Size of buffer if not set by user.
//

#define TRANSPORT_BUFFER_DEFAULT_SIZE         16384

//
// Type of transport.
//

typedef enum
{
  transport_base,
  transport_proxy,
  transport_agent,
  transport_last_tag

} T_transport_type;

//
// This class handles the buffered I/O on
// the network sockets.
//

//
// TODO: This class is useful but adds a lot of
// overhead. There are many improvements we can
// make here:
//
// - There should be a generic Buffer class, ac-
//   comodating a list of memory buffers. This
//   would enable the use of the readv() and
//   writev() functions to perform the I/O on
//   the socket.
//
// - The buffering should be moved to the Write-
//   Buffer and ReadBuffer classes. By performing
//   the buffering here and there, we are dupli-
//   cating a lot of code and are adding a lot
//   of useless memory copies.
//
// - Stream compression should be removed. The
//   proxy should compress the frames based on
//   the type and should include the length of
//   the decompressed data in the header of the
//   packet. Besides avoiding the compression
//   of packets that cannot be reduced in size,
//   we would also save the additional memory
//   allocations due to the fact that we don't
//   know the size of the decode buffer at the
//   time we read the packet from the network.
//
// - The other utilities implemented here, like
//   the functions forcing a write on the socket
//   or waiting for more data to become available
//   should be moved to the Proxy or the Channel
//   classes.
//

class Transport
{
  public:

  //
  // Member functions.
  //

  Transport(int fd);

  virtual ~Transport();

  int fd() const
  {
    return fd_;
  }

  T_transport_type getType()
  {
    return type_;
  }

  //
  // Virtual members redefined by proxy
  // and 'memory-to-memory' I/O layers.
  //

  virtual int read(unsigned char *data, unsigned int size);

  virtual int write(T_write type, const unsigned char *data, const unsigned int size);

  virtual int flush();

  virtual int drain(int limit, int timeout);

  virtual void finish()
  {
    fullReset();

    finish_ = 1;
  }

  virtual int length() const
  {
    return w_buffer_.length_;
  }

  virtual int pending() const
  {
    return 0;
  }

  virtual int readable() const
  {
    return GetBytesReadable(fd_);
  }

  virtual int writable() const
  {
    return GetBytesWritable(fd_);
  }

  virtual int queued() const
  {
    return GetBytesQueued(fd_);
  }

  virtual int flushable() const
  {
    return 0;
  }

  virtual int wait(int timeout) const;

  void setSize(unsigned int initialSize,
                   unsigned int thresholdSize,
                       unsigned int maximumSize);

  //
  // Return a pointer to the data
  // in the read buffer.
  //

  virtual unsigned int getPending(unsigned char *&data)
  {
    data = NULL;

    return 0;
  }

  virtual void pendingReset()
  {
  }

  virtual void partialReset()
  {
    partialReset(w_buffer_);
  }

  virtual void fullReset();

  int blocked() const
  {
    return blocked_;
  }

  protected:

  //
  // Make room in the buffer to accommodate
  // at least size bytes.
  //

  int resize(T_buffer &buffer, const int &size);

  void partialReset(T_buffer &buffer)
  {
    if (buffer.length_ == 0 &&
            (buffer.data_.size() > initialSize_ ||
                 buffer.data_.capacity() > initialSize_))
    {
      fullReset(buffer);
    }
  }

  void fullReset(T_buffer &buffer);

  //
  // Data members.
  //

  int fd_;

  int blocked_;
  int finish_;

  T_buffer w_buffer_;

  unsigned int initialSize_;
  unsigned int thresholdSize_;
  unsigned int maximumSize_;

  T_transport_type type_;

  private:

  #ifdef REFERENCES

  static int references_;

  #endif
};

//
// This class handles buffered I/O and
// compression of the proxy stream.
//

class ProxyTransport : public Transport
{
  public:

  ProxyTransport(int fd);

  virtual ~ProxyTransport();

  virtual int read(unsigned char *data, unsigned int size);

  virtual int write(T_write type, const unsigned char *data, const unsigned int size);

  virtual int flush();

  //
  // Same as in the base class.
  //
  // virtual int drain(int limit, int timeout);
  //
  // virtual void finish();
  //

  //
  // Same as in the base class.
  //
  // virtual int length() const
  //

  virtual int pending() const
  {
    return r_buffer_.length_;
  }

  //
  // Same as in the base class.
  //
  // virtual int readable() const;
  //
  // virtual int writable() const;
  //
  // virtual int queued() const;
  //

  virtual int flushable() const
  {
    return flush_;
  }

  //
  // Same as in the base class, but
  // should not be called.
  //
  // int drained() const;
  //
  // Same as in the base class.
  //
  // virtual int wait(int timeout) const;
  //
  // Same as in the base class.
  //
  // void setSize(unsigned int initialSize,
  //                  unsigned int thresholdSize,
  //                      unsigned int maximumSize);
  //

  virtual unsigned int getPending(unsigned char *&data);

  virtual void pendingReset()
  {
    owner_ = 1;
  }

  virtual void partialReset()
  {
    if (owner_ == 1)
    {
      Transport::partialReset(r_buffer_);
    }

    Transport::partialReset(w_buffer_);
  }

  virtual void fullReset();

  //
  // Same as in the base class.
  //
  // int blocked() const;
  //

  protected:

  int flush_;
  int owner_;

  T_buffer r_buffer_;

  z_stream r_stream_;
  z_stream w_stream_;

  private:

  #ifdef REFERENCES

  static int references_;

  #endif
};

//
// Handle memory-to-memory data transfers between
// an agent and the proxy.
//

class AgentTransport : public Transport
{
  public:

  AgentTransport(int fd);

  virtual ~AgentTransport();

  virtual int read(unsigned char *data, unsigned int size);

  virtual int write(T_write type, const unsigned char *data, const unsigned int size);

  //
  // These two should never be called.
  //

  virtual int flush() __attribute__((noreturn));

  virtual int drain(int limit, int timeout) __attribute((noreturn));

  //
  // Same as in the base class.
  //
  // virtual void finish();
  //

  //
  // Same as in the base class.
  //
  // virtual int length() const
  //

  virtual int pending() const
  {
    return r_buffer_.length_;
  }

  //
  // These are intended to operate only
  // on the internal buffers.
  //

  virtual int readable() const
  {
    return r_buffer_.length_;
  }

  virtual int writable() const
  {
    return control -> TransportMaximumBufferSize;
  }

  virtual int queued() const
  {
    return 0;
  }

  //
  // Same as in the base class.
  //
  // virtual int flushable() const;
  //
  // Same as in the base class, but
  // should not be called.
  //
  // int drained() const;
  //

  //
  // Return immediately or will
  // block until the timeout.
  //

  virtual int wait(int timeout) const
  {
    return 0;
  }

  //
  // Same as in the base class.
  //
  // void setSize(unsigned int initialSize,
  //                  unsigned int thresholdSize,
  //                      unsigned int maximumSize);
  //

  virtual unsigned int getPending(unsigned char *&data);

  virtual void pendingReset()
  {
    owner_ = 1;
  }

  virtual void partialReset()
  {
    if (owner_ == 1)
    {
      Transport::partialReset(r_buffer_);
    }

    Transport::partialReset(w_buffer_);
  }

  virtual void fullReset();

  //
  // Same as in the base class.
  //
  // int blocked() const;
  //

  //
  // The following are specific of the
  // memory-to-memory transport.
  //

  int enqueue(const char *data, const int size);

  int dequeue(char *data, int size);

  int queuable()
  {
    //
    // Always allow the agent to enqueue
    // more data.
    //

    return control -> TransportMaximumBufferSize;
  }

  int dequeuable();

  protected:

  //
  // Lock the buffer to handle reads and
  // writes safely.
  //

  #ifdef THREADS

  int lockRead();
  int lockWrite();

  int unlockRead();
  int unlockWrite();

  #endif

  //
  // Data members.
  //

  int owner_;

  T_buffer r_buffer_;

  //
  // Mutexes for safe read and write.
  //

  #ifdef THREADS

  pthread_mutex_t m_read_;
  pthread_mutex_t m_write_;

  #endif

  private:

  #ifdef REFERENCES

  static int references_;

  #endif
};

#endif /* Transport_H */