Create common base classes for buffered streams

Most streams are backed by a memory buffer. Create common base classes
for this functionality to avoid code duplication.
pull/36/head
Pierre Ossman 5 years ago committed by Lauri Kasanen
parent 7f90205cf2
commit 92c7695981

@ -0,0 +1,72 @@
/* Copyright (C) 2002-2005 RealVNC Ltd. All Rights Reserved.
* Copyright 2020 Pierre Ossman for Cendio AB
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
* USA.
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <rdr/BufferedInStream.h>
#include <rdr/Exception.h>
using namespace rdr;
static const size_t DEFAULT_BUF_SIZE = 8192;
BufferedInStream::BufferedInStream(size_t bufSize_)
: bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
{
ptr = end = start = new U8[bufSize];
}
BufferedInStream::~BufferedInStream()
{
delete [] start;
}
size_t BufferedInStream::pos()
{
return offset + ptr - start;
}
size_t BufferedInStream::overrun(size_t itemSize, size_t nItems, bool wait)
{
if (itemSize > bufSize)
throw Exception("BufferedInStream overrun: "
"requested size of %lu bytes exceeds maximum of %lu bytes",
(long unsigned)itemSize, (long unsigned)bufSize);
if (end - ptr != 0)
memmove(start, ptr, end - ptr);
offset += ptr - start;
end -= ptr - start;
ptr = start;
while (avail() < itemSize) {
if (!fillBuffer(start + bufSize - end, wait))
return 0;
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
}

@ -0,0 +1,54 @@
/* Copyright (C) 2002-2005 RealVNC Ltd. All Rights Reserved.
* Copyright 2020 Pierre Ossman for Cendio AB
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
* USA.
*/
//
// Base class for input streams with a buffer
//
#ifndef __RDR_BUFFEREDINSTREAM_H__
#define __RDR_BUFFEREDINSTREAM_H__
#include <rdr/InStream.h>
namespace rdr {
class BufferedInStream : public InStream {
public:
virtual ~BufferedInStream();
virtual size_t pos();
private:
virtual bool fillBuffer(size_t maxSize, bool wait) = 0;
virtual size_t overrun(size_t itemSize, size_t nItems, bool wait);
private:
size_t bufSize;
size_t offset;
U8* start;
protected:
BufferedInStream(size_t bufSize=0);
};
} // end of namespace rdr
#endif

@ -0,0 +1,115 @@
/* Copyright (C) 2002-2005 RealVNC Ltd. All Rights Reserved.
* Copyright 2011-2020 Pierre Ossman for Cendio AB
* Copyright 2017 Peter Astrand <astrand@cendio.se> for Cendio AB
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
* USA.
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <rdr/BufferedOutStream.h>
#include <rdr/Exception.h>
using namespace rdr;
static const size_t DEFAULT_BUF_SIZE = 16384;
BufferedOutStream::BufferedOutStream(size_t bufSize_)
: bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
{
ptr = start = sentUpTo = new U8[bufSize];
end = start + bufSize;
}
BufferedOutStream::~BufferedOutStream()
{
// FIXME: Complain about non-flushed buffer?
delete [] start;
}
size_t BufferedOutStream::length()
{
return offset + ptr - sentUpTo;
}
size_t BufferedOutStream::bufferUsage()
{
return ptr - sentUpTo;
}
void BufferedOutStream::flush()
{
while (sentUpTo < ptr) {
size_t len;
len = bufferUsage();
if (!flushBuffer(false))
break;
offset += len - bufferUsage();
}
// Managed to flush everything?
if (sentUpTo == ptr)
ptr = sentUpTo = start;
}
size_t BufferedOutStream::overrun(size_t itemSize, size_t nItems)
{
if (itemSize > bufSize)
throw Exception("BufferedOutStream overrun: "
"requested size of %lu bytes exceeds maximum of %lu bytes",
(long unsigned)itemSize, (long unsigned)bufSize);
// First try to get rid of the data we have
flush();
// Still not enough space?
while (itemSize > avail()) {
// Can we shuffle things around?
// (don't do this if it gains us less than 25%)
if (((size_t)(sentUpTo - start) > bufSize / 4) &&
(itemSize < bufSize - (ptr - sentUpTo))) {
memmove(start, sentUpTo, ptr - sentUpTo);
ptr = start + (ptr - sentUpTo);
sentUpTo = start;
} else {
size_t len;
len = bufferUsage();
// Have to get rid of more data, so allow the flush to wait...
flushBuffer(true);
offset += len - bufferUsage();
// Managed to flush everything?
if (sentUpTo == ptr)
ptr = sentUpTo = start;
}
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
}

@ -0,0 +1,65 @@
/* Copyright (C) 2002-2005 RealVNC Ltd. All Rights Reserved.
* Copyright 2011-2020 Pierre Ossman for Cendio AB
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
* USA.
*/
//
// Base class for output streams with a buffer
//
#ifndef __RDR_BUFFEREDOUTSTREAM_H__
#define __RDR_BUFFEREDOUTSTREAM_H__
#include <rdr/OutStream.h>
namespace rdr {
class BufferedOutStream : public OutStream {
public:
virtual ~BufferedOutStream();
virtual size_t length();
virtual void flush();
size_t bufferUsage();
private:
// flushBuffer() requests that the stream be flushed. Returns true if it is
// able to progress the output (which might still not mean any bytes
// actually moved) and can be called again. If wait is true then it will
// block until all data has been written.
virtual bool flushBuffer(bool wait) = 0;
virtual size_t overrun(size_t itemSize, size_t nItems);
private:
size_t bufSize;
size_t offset;
U8* start;
protected:
U8* sentUpTo;
protected:
BufferedOutStream(size_t bufSize=0);
};
}
#endif

@ -1,6 +1,8 @@
include_directories(${CMAKE_SOURCE_DIR}/common ${ZLIB_INCLUDE_DIRS})
add_library(rdr STATIC
BufferedInStream.cxx
BufferedOutStream.cxx
Exception.cxx
FdInStream.cxx
FdOutStream.cxx

@ -36,13 +36,6 @@
#include <unistd.h>
#endif
#ifndef vncmin
#define vncmin(a,b) (((a) < (b)) ? (a) : (b))
#endif
#ifndef vncmax
#define vncmax(a,b) (((a) > (b)) ? (a) : (b))
#endif
/* Old systems have select() in sys/time.h */
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
@ -57,26 +50,23 @@ enum { DEFAULT_BUF_SIZE = 8192 };
FdInStream::FdInStream(int fd_, int timeoutms_, size_t bufSize_,
bool closeWhenDone_)
: fd(fd_), closeWhenDone(closeWhenDone_),
: BufferedInStream(bufSize_),
fd(fd_), closeWhenDone(closeWhenDone_),
timeoutms(timeoutms_), blockCallback(0),
timing(false), timeWaitedIn100us(5), timedKbits(0),
bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
timing(false), timeWaitedIn100us(5), timedKbits(0)
{
ptr = end = start = new U8[bufSize];
}
FdInStream::FdInStream(int fd_, FdInStreamBlockCallback* blockCallback_,
size_t bufSize_)
: fd(fd_), timeoutms(0), blockCallback(blockCallback_),
timing(false), timeWaitedIn100us(5), timedKbits(0),
bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
: BufferedInStream(bufSize_),
fd(fd_), timeoutms(0), blockCallback(blockCallback_),
timing(false), timeWaitedIn100us(5), timedKbits(0)
{
ptr = end = start = new U8[bufSize];
}
FdInStream::~FdInStream()
{
delete [] start;
if (closeWhenDone) close(fd);
}
@ -91,46 +81,15 @@ void FdInStream::setBlockCallback(FdInStreamBlockCallback* blockCallback_)
timeoutms = 0;
}
size_t FdInStream::pos()
{
return offset + ptr - start;
}
size_t FdInStream::overrun(size_t itemSize, size_t nItems, bool wait)
bool FdInStream::fillBuffer(size_t maxSize, bool wait)
{
if (itemSize > bufSize)
throw Exception("FdInStream overrun: max itemSize exceeded");
if (end - ptr != 0)
memmove(start, ptr, end - ptr);
offset += ptr - start;
end -= ptr - start;
ptr = start;
size_t bytes_to_read;
while ((size_t)(end - start) < itemSize) {
bytes_to_read = start + bufSize - end;
if (!timing) {
// When not timing, we must be careful not to read too much
// extra data into the buffer. Otherwise, the line speed
// estimation might stay at zero for a long time: All reads
// during timing=1 can be satisfied without calling
// readWithTimeoutOrCallback. However, reading only 1 or 2 bytes
// bytes is ineffecient.
bytes_to_read = vncmin(bytes_to_read, vncmax(itemSize*nItems, 8));
}
size_t n = readWithTimeoutOrCallback((U8*)end, bytes_to_read, wait);
if (n == 0) return 0;
size_t n = readWithTimeoutOrCallback((U8*)end, maxSize, wait);
if (n == 0)
return false;
end += n;
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
return true;
}
//

@ -23,7 +23,7 @@
#ifndef __RDR_FDINSTREAM_H__
#define __RDR_FDINSTREAM_H__
#include <rdr/InStream.h>
#include <rdr/BufferedInStream.h>
namespace rdr {
@ -33,7 +33,7 @@ namespace rdr {
virtual ~FdInStreamBlockCallback() {}
};
class FdInStream : public InStream {
class FdInStream : public BufferedInStream {
public:
@ -46,17 +46,15 @@ namespace rdr {
void setTimeout(int timeoutms);
void setBlockCallback(FdInStreamBlockCallback* blockCallback);
int getFd() { return fd; }
size_t pos();
void startTiming();
void stopTiming();
unsigned int kbitsPerSecond();
unsigned int timeWaited() { return timeWaitedIn100us; }
protected:
size_t overrun(size_t itemSize, size_t nItems, bool wait);
private:
virtual bool fillBuffer(size_t maxSize, bool wait);
size_t readWithTimeoutOrCallback(void* buf, size_t len, bool wait=true);
int fd;
@ -68,7 +66,6 @@ namespace rdr {
unsigned int timeWaitedIn100us;
unsigned int timedKbits;
size_t bufSize;
size_t offset;
U8* start;
};

@ -49,26 +49,20 @@
using namespace rdr;
enum { DEFAULT_BUF_SIZE = 16384 };
FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, size_t bufSize_)
: fd(fd_), blocking(blocking_), timeoutms(timeoutms_),
bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
: BufferedOutStream(bufSize_),
fd(fd_), blocking(blocking_), timeoutms(timeoutms_)
{
ptr = start = sentUpTo = new U8[bufSize];
end = start + bufSize;
gettimeofday(&lastWrite, NULL);
}
FdOutStream::~FdOutStream()
{
try {
blocking = true;
flush();
while (sentUpTo != ptr)
flushBuffer(true);
} catch (Exception&) {
}
delete [] start;
}
void FdOutStream::setTimeout(int timeoutms_) {
@ -79,82 +73,29 @@ void FdOutStream::setBlocking(bool blocking_) {
blocking = blocking_;
}
size_t FdOutStream::length()
{
return offset + ptr - sentUpTo;
}
int FdOutStream::bufferUsage()
{
return ptr - sentUpTo;
}
unsigned FdOutStream::getIdleTime()
{
return rfb::msSince(&lastWrite);
}
void FdOutStream::flush()
bool FdOutStream::flushBuffer(bool wait)
{
while (sentUpTo < ptr) {
size_t n = writeWithTimeout((const void*) sentUpTo,
ptr - sentUpTo,
blocking? timeoutms : 0);
(blocking || wait)? timeoutms : 0);
// Timeout?
if (n == 0) {
// If non-blocking then we're done here
if (!blocking)
break;
if (!blocking && !wait)
return false;
throw TimedOut();
}
sentUpTo += n;
offset += n;
}
// Managed to flush everything?
if (sentUpTo == ptr)
ptr = sentUpTo = start;
}
size_t FdOutStream::overrun(size_t itemSize, size_t nItems)
{
if (itemSize > bufSize)
throw Exception("FdOutStream overrun: max itemSize exceeded");
// First try to get rid of the data we have
flush();
// Still not enough space?
if (itemSize > avail()) {
// Can we shuffle things around?
// (don't do this if it gains us less than 25%)
if (((size_t)(sentUpTo - start) > bufSize / 4) &&
(itemSize < bufSize - (ptr - sentUpTo))) {
memmove(start, sentUpTo, ptr - sentUpTo);
ptr = start + (ptr - sentUpTo);
sentUpTo = start;
} else {
// Have to get rid of more data, so turn off non-blocking
// for a bit...
bool realBlocking;
realBlocking = blocking;
blocking = true;
flush();
blocking = realBlocking;
}
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
return true;
}
//

@ -26,11 +26,11 @@
#include <sys/time.h>
#include <rdr/OutStream.h>
#include <rdr/BufferedOutStream.h>
namespace rdr {
class FdOutStream : public OutStream {
class FdOutStream : public BufferedOutStream {
public:
@ -41,23 +41,14 @@ namespace rdr {
void setBlocking(bool blocking);
int getFd() { return fd; }
void flush();
size_t length();
int bufferUsage();
unsigned getIdleTime();
private:
size_t overrun(size_t itemSize, size_t nItems);
virtual bool flushBuffer(bool wait);
size_t writeWithTimeout(const void* data, size_t length, int timeoutms);
int fd;
bool blocking;
int timeoutms;
size_t bufSize;
size_t offset;
U8* start;
U8* sentUpTo;
struct timeval lastWrite;
};

@ -30,7 +30,6 @@ FileInStream::FileInStream(const char *fileName)
file = fopen(fileName, "rb");
if (!file)
throw SystemException("fopen", errno);
ptr = end = b;
}
FileInStream::~FileInStream(void) {
@ -40,50 +39,17 @@ FileInStream::~FileInStream(void) {
}
}
void FileInStream::reset(void) {
if (!file)
throw Exception("File is not open");
if (fseek(file, 0, SEEK_SET) != 0)
throw SystemException("fseek", errno);
ptr = end = b;
}
size_t FileInStream::pos()
{
if (!file)
throw Exception("File is not open");
return ftell(file) + ptr - b;
}
size_t FileInStream::overrun(size_t itemSize, size_t nItems, bool wait)
bool FileInStream::fillBuffer(size_t maxSize, bool wait)
{
if (itemSize > sizeof(b))
throw Exception("FileInStream overrun: max itemSize exceeded");
if (end - ptr != 0)
memmove(b, ptr, end - ptr);
end -= ptr - b;
ptr = b;
while ((size_t)(end - b) < itemSize) {
size_t n = fread((U8 *)end, b + sizeof(b) - end, 1, file);
size_t n = fread((U8 *)end, 1, maxSize, file);
if (n == 0) {
if (ferror(file))
throw SystemException("fread", errno);
if (feof(file))
throw EndOfStream();
return 0;
}
end += b + sizeof(b) - end;
return false;
}
end += n;
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
return true;
}

@ -22,26 +22,21 @@
#include <stdio.h>
#include <rdr/InStream.h>
#include <rdr/BufferedInStream.h>
namespace rdr {
class FileInStream : public InStream {
class FileInStream : public BufferedInStream {
public:
FileInStream(const char *fileName);
~FileInStream(void);
void reset(void);
size_t pos();
protected:
size_t overrun(size_t itemSize, size_t nItems, bool wait = true);
private:
virtual bool fillBuffer(size_t maxSize, bool wait);
private:
U8 b[131072];
FILE *file;
};

@ -24,18 +24,14 @@
using namespace rdr;
const int DEFAULT_BUF_LEN = 16384;
static inline int min(int a, int b) {return a<b ? a : b;}
HexInStream::HexInStream(InStream& is, size_t bufSize_)
: bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_LEN), offset(0), in_stream(is)
: BufferedInStream(bufSize_), in_stream(is)
{
ptr = end = start = new U8[bufSize];
}
HexInStream::~HexInStream() {
delete [] start;
}
@ -76,27 +72,13 @@ decodeError:
}
size_t HexInStream::pos() {
return offset + ptr - start;
}
size_t HexInStream::overrun(size_t itemSize, size_t nItems, bool wait) {
if (itemSize > bufSize)
throw Exception("HexInStream overrun: max itemSize exceeded");
if (end - ptr != 0)
memmove(start, ptr, end - ptr);
end -= ptr - start;
offset += ptr - start;
ptr = start;
bool HexInStream::fillBuffer(size_t maxSize, bool wait) {
if (!in_stream.check(2, 1, wait))
return false;
while (avail() < itemSize) {
size_t n = in_stream.check(2, 1, wait);
if (n == 0) return 0;
const U8* iptr = in_stream.getptr();
const U8* eptr = in_stream.getend();
size_t length = min((eptr - iptr)/2, start + bufSize - end);
size_t length = min((eptr - iptr)/2, maxSize);
U8* optr = (U8*) end;
for (size_t i=0; i<length; i++) {
@ -108,12 +90,6 @@ size_t HexInStream::overrun(size_t itemSize, size_t nItems, bool wait) {
in_stream.setptr(iptr + length*2);
end += length;
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
return true;
}

@ -19,29 +19,23 @@
#ifndef __RDR_HEX_INSTREAM_H__
#define __RDR_HEX_INSTREAM_H__
#include <rdr/InStream.h>
#include <rdr/BufferedInStream.h>
namespace rdr {
class HexInStream : public InStream {
class HexInStream : public BufferedInStream {
public:
HexInStream(InStream& is, size_t bufSize=0);
virtual ~HexInStream();
size_t pos();
static bool readHexAndShift(char c, int* v);
static bool hexStrToBin(const char* s, char** data, size_t* length);
protected:
size_t overrun(size_t itemSize, size_t nItems, bool wait);
private:
size_t bufSize;
U8* start;
size_t offset;
virtual bool fillBuffer(size_t maxSize, bool wait);
private:
InStream& in_stream;
};

@ -32,15 +32,10 @@
using namespace rdr;
const size_t DEFAULT_BUF_LEN = 256;
unsigned int RandomStream::seed;
RandomStream::RandomStream()
: offset(0)
{
ptr = end = start = new U8[DEFAULT_BUF_LEN];
#ifdef RFB_HAVE_WINCRYPT
provider = 0;
if (!CryptAcquireContext(&provider, 0, 0, PROV_RSA_FULL, 0)) {
@ -72,8 +67,6 @@ RandomStream::RandomStream()
}
RandomStream::~RandomStream() {
delete [] start;
#ifdef RFB_HAVE_WINCRYPT
if (provider)
CryptReleaseContext(provider, 0);
@ -83,50 +76,29 @@ RandomStream::~RandomStream() {
#endif
}
size_t RandomStream::pos() {
return offset + ptr - start;
}
size_t RandomStream::overrun(size_t itemSize, size_t nItems, bool wait) {
if (itemSize > DEFAULT_BUF_LEN)
throw Exception("RandomStream overrun: max itemSize exceeded");
if (end - ptr != 0)
memmove(start, ptr, end - ptr);
end -= ptr - start;
offset += ptr - start;
ptr = start;
size_t length = start + DEFAULT_BUF_LEN - end;
bool RandomStream::fillBuffer(size_t maxSize, bool wait) {
#ifdef RFB_HAVE_WINCRYPT
if (provider) {
if (!CryptGenRandom(provider, length, (U8*)end))
if (!CryptGenRandom(provider, maxSize, (U8*)end))
throw rdr::SystemException("unable to CryptGenRandom", GetLastError());
end += length;
end += maxSize;
} else {
#else
#ifndef WIN32
if (fp) {
size_t n = fread((U8*)end, length, 1, fp);
if (n != 1)
size_t n = fread((U8*)end, 1, maxSize, fp);
if (n <= 0)
throw rdr::SystemException("reading /dev/urandom or /dev/random failed",
errno);
end += length;
end += n;
} else {
#else
{
#endif
#endif
for (size_t i=0; i<length; i++)
for (size_t i=0; i<maxSize; i++)
*(U8*)end++ = (int) (256.0*rand()/(RAND_MAX+1.0));
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
return true;
}

@ -20,7 +20,7 @@
#define __RDR_RANDOMSTREAM_H__
#include <stdio.h>
#include <rdr/InStream.h>
#include <rdr/BufferedInStream.h>
#ifdef WIN32
#include <windows.h>
@ -32,22 +32,17 @@
namespace rdr {
class RandomStream : public InStream {
class RandomStream : public BufferedInStream {
public:
RandomStream();
virtual ~RandomStream();
size_t pos();
protected:
size_t overrun(size_t itemSize, size_t nItems, bool wait);
private:
U8* start;
size_t offset;
virtual bool fillBuffer(size_t maxSize, bool wait);
private:
static unsigned int seed;
#ifdef RFB_HAVE_WINCRYPT
HCRYPTPROV provider;

@ -30,8 +30,6 @@
#ifdef HAVE_GNUTLS
using namespace rdr;
enum { DEFAULT_BUF_SIZE = 16384 };
ssize_t TLSInStream::pull(gnutls_transport_ptr_t str, void* data, size_t size)
{
TLSInStream* self= (TLSInStream*) str;
@ -43,8 +41,8 @@ ssize_t TLSInStream::pull(gnutls_transport_ptr_t str, void* data, size_t size)
return -1;
}
if ((size_t)(in->getend() - in->getptr()) < size)
size = in->getend() - in->getptr();
if (in->avail() < size)
size = in->avail();
in->readBytes(data, size);
@ -57,12 +55,10 @@ ssize_t TLSInStream::pull(gnutls_transport_ptr_t str, void* data, size_t size)
}
TLSInStream::TLSInStream(InStream* _in, gnutls_session_t _session)
: session(_session), in(_in), bufSize(DEFAULT_BUF_SIZE), offset(0)
: session(_session), in(_in)
{
gnutls_transport_ptr_t recv, send;
ptr = end = start = new U8[bufSize];
gnutls_transport_set_pull_function(session, pull);
gnutls_transport_get_ptr2(session, &recv, &send);
gnutls_transport_set_ptr2(session, this, send);
@ -71,40 +67,16 @@ TLSInStream::TLSInStream(InStream* _in, gnutls_session_t _session)
TLSInStream::~TLSInStream()
{
gnutls_transport_set_pull_function(session, NULL);
delete[] start;
}
size_t TLSInStream::pos()
{
return offset + ptr - start;
}
size_t TLSInStream::overrun(size_t itemSize, size_t nItems, bool wait)
bool TLSInStream::fillBuffer(size_t maxSize, bool wait)
{
if (itemSize > bufSize)
throw Exception("TLSInStream overrun: max itemSize exceeded");
if (end - ptr != 0)
memmove(start, ptr, end - ptr);
offset += ptr - start;
end -= ptr - start;
ptr = start;
while ((size_t)(end - start) < itemSize) {
size_t n = readTLS((U8*) end, start + bufSize - end, wait);
size_t n = readTLS((U8*) end, maxSize, wait);
if (!wait && n == 0)
return 0;
return false;
end += n;
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
return true;
}
size_t TLSInStream::readTLS(U8* buf, size_t len, bool wait)

@ -27,27 +27,22 @@
#ifdef HAVE_GNUTLS
#include <gnutls/gnutls.h>
#include <rdr/InStream.h>
#include <rdr/BufferedInStream.h>
namespace rdr {
class TLSInStream : public InStream {
class TLSInStream : public BufferedInStream {
public:
TLSInStream(InStream* in, gnutls_session_t session);
virtual ~TLSInStream();
size_t pos();
private:
size_t overrun(size_t itemSize, size_t nItems, bool wait);
virtual bool fillBuffer(size_t maxSize, bool wait);
size_t readTLS(U8* buf, size_t len, bool wait);
static ssize_t pull(gnutls_transport_ptr_t str, void* data, size_t size);
gnutls_session_t session;
InStream* in;
size_t bufSize;
size_t offset;
U8* start;
};
};

@ -24,41 +24,31 @@
using namespace rdr;
enum { DEFAULT_BUF_SIZE = 16384 };
ZlibInStream::ZlibInStream(size_t bufSize_)
: underlying(0), bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0),
zs(NULL), bytesIn(0)
: BufferedInStream(bufSize_),
underlying(0), zs(NULL), bytesIn(0)
{
ptr = end = start = new U8[bufSize];
init();
}
ZlibInStream::~ZlibInStream()
{
deinit();
delete [] start;
}
void ZlibInStream::setUnderlying(InStream* is, size_t bytesIn_)
{
underlying = is;
bytesIn = bytesIn_;
ptr = end = start;
}
size_t ZlibInStream::pos()
{
return offset + ptr - start;
skip(avail());
}
void ZlibInStream::flushUnderlying()
{
ptr = end = start;
while (bytesIn > 0) {
decompress(true);
end = start; // throw away any data
if (!check(1))
throw Exception("ZlibInStream: failed to flush remaining stream data");
skip(avail());
}
setUnderlying(NULL, 0);
@ -96,42 +86,13 @@ void ZlibInStream::deinit()
zs = NULL;
}
size_t ZlibInStream::overrun(size_t itemSize, size_t nItems, bool wait)
{
if (itemSize > bufSize)
throw Exception("ZlibInStream overrun: max itemSize exceeded");
if (end - ptr != 0)
memmove(start, ptr, end - ptr);
offset += ptr - start;
end -= ptr - start;
ptr = start;
while (avail() < itemSize) {
if (!decompress(wait))
return 0;
}
size_t nAvail;
nAvail = avail() / itemSize;
if (nAvail < nItems)
return nAvail;
return nItems;
}
// decompress() calls the decompressor once. Note that this won't necessarily
// generate any output data - it may just consume some input data. Returns
// false if wait is false and we would block on the underlying stream.
bool ZlibInStream::decompress(bool wait)
bool ZlibInStream::fillBuffer(size_t maxSize, bool wait)
{
if (!underlying)
throw Exception("ZlibInStream overrun: no underlying stream");
zs->next_out = (U8*)end;
zs->avail_out = start + bufSize - end;
zs->avail_out = maxSize;
size_t n = underlying->check(1, 1, wait);
if (n == 0) return false;

@ -24,38 +24,32 @@
#ifndef __RDR_ZLIBINSTREAM_H__
#define __RDR_ZLIBINSTREAM_H__
#include <rdr/InStream.h>
#include <rdr/BufferedInStream.h>
struct z_stream_s;
namespace rdr {
class ZlibInStream : public InStream {
class ZlibInStream : public BufferedInStream {
public:
ZlibInStream(size_t bufSize=0);
virtual ~ZlibInStream();
void setUnderlying(InStream* is, size_t bytesIn);
void flushUnderlying();
size_t pos();
void reset();
private:
void init();
void deinit();
size_t overrun(size_t itemSize, size_t nItems, bool wait);
bool decompress(bool wait);
virtual bool fillBuffer(size_t maxSize, bool wait);
private:
InStream* underlying;
size_t bufSize;
size_t offset;
z_stream_s* zs;
size_t bytesIn;
U8* start;
};
} // end of namespace rdr

Loading…
Cancel
Save