Pumping Data
Pumping Data at certain rates or in user supplied blocks is a subject that surfaces on occasion. The need often arises with custom protocols and resource constrained device like mobile phones and development boards. This wiki article will show you how to manually pump data.
When manually pumping data you normally select a block size, like 4K or 64K. You create the source, like StringSource
or FileSource
, and you set pumpAll
to false
to signal the program will control the flow. Then, you call Pump
on the source to process data from the source. You can also call Flush
on occasion to ensure processed data is cleared from the pipeline (but you should not have to if things are working as expected).
There are two types of flushes available - a soft flush and a hard flush. The soft flush is Flush(false)
, and a hard flush is Flush(true)
. You have to be careful about flushing data in the pipeline. According to the source code comments in filters.h
:
Hard flushes must be used with care. It means try to process and output everything, even if there may not be enough data to complete the action. For example, hard flushing a HexDecoder would cause an error if you do it after inputing an odd number of hex encoded characters.
For some types of filters, like ZlibDecompressor, hard flushes can only be done at "synchronization points". These synchronization points are positions in the data stream that are created by hard flushes on the corresponding reverse filters, in this example ZlibCompressor. This is useful when zlib compressed data is moved across a network in packets and compression state is preserved across packets, as in the SSH2 protocol
The section on low memory below provides a quick test environment verify operations with approximately 16 MB of memory.
Low Memory
You can simulate low memory in Bash using ulimit
in its own sub-shell so it does not affect new processes. The command below simulates 16 MB of memory for the process. 8 MB and 12 MB were too small to start the process.
$ (ulimit -v 8192; ./pump.exe) ./pump.exe: error while loading shared libraries: libm.so.6: failed to map segment from shared object $ (ulimit -v 12288; ./pump.exe) ./pump.exe: error while loading shared libraries: libc.so.6: failed to map segment from shared object $ (ulimit -v 16368; ./pump.exe) Processed: 10485760 Processed: 20971520 Processed: 31457280 Processed: 41943040 ... Processed: 503316480 Processed: 513802240 Processed: 524288000 Processed: 534773760
Keep It Simple
Folks who pump their own data usually wish to do it because of low memory conditions. The library does fine on its own and usually does not need help. The library chunks data in 4K blocks and can easily handle gigabytes of data. If you are trying to pump your own data you are probably over thinking the solution.
For example, the program below calculates multiple hashes over a large, 1 GB file. It does so using less than 4 MB of RAM for the program and data.
#include "cryptlib.h" #include "channels.h" #include "filters.h" #include "files.h" #include "sha.h" #include "crc.h" #include "hex.h" #include <string> #include <iostream> int main(int argc, char* argv[]) { using namespace CryptoPP; try { std::string s1, s2; CRC32 crc; SHA1 sha1; HashFilter f1(crc, new HexEncoder(new StringSink(s1))); HashFilter f2(sha1, new HexEncoder(new StringSink(s2))); ChannelSwitch cs; cs.AddDefaultRoute(f1); cs.AddDefaultRoute(f2); FileSource("data.bin", true /*pumpAll*/, new Redirector(cs)); std::cout << "Filename: " << "data.bin" << std::endl; std::cout << " CRC32: " << s1 << std::endl; std::cout << " SHA1: " << s2 << std::endl; } catch(const Exception& ex) { std::cerr << ex.what() << std::endl; } return 0; }
And the file:
$ dd if=/dev/urandom of=data.bin bs=1M count=1024 1024+0 records in 1024+0 records out 1073741824 bytes (1.1 GB, 1.0 GiB) copied, 5.48884 s, 196 MB/s
And the result with 16 MB of RAM is shown below. 12 MB is used to map the process; and less than 4 MB is used by the program:
$ (ulimit -v 16368; ./test.exe) Filename: data.bin CRC32: 20F45F3E SHA1: F0857F8E46112BB08A04D5CE51BDBEA0C4539032
Filter Framework
The code below is the basic skeleton or framework to use when manually pumping data. The sample program in this section does not do much at all. About all it is capable of is counting bytes with a MeterFilter
. Note that the bytes are counted between the filter and the sink, so they are bytes being written to storage. Cache managers could affect writing of the data to storage, so be sure you understand what your OS or other libraries are doing.
SomeFilter
is not a real filter, but it is often an encoder, decoder, encryptor, decryptor, compressor, decompressor, etc. The Redirector
breaks the ownership chain in a pipeline. It avoids nested calls to new
and ensures objects like MeterFilter
survive to read the measurement.
The need for remaining
is required because of the way SourceExhausted
operates in a pipeline. Effectively SourceExhausted
returns false
until MessageEnd()
arrives because the machinery is message oriented, and not byte oriented.
int main(int argc, char* argv[]) { try { MeterFilter meter; SomeFilter filter(...); SomeSource source(...); SomeSink sink(...); source.Attach(new Redirector(filter)); filter.Attach(new Redirector(meter)); meter.Attach(new Redirector(sink)); const size_t BLOCK_SIZE = 4096; lword remaining = ...; lword processed = 0; while(remaining && !source.SourceExhausted()) { unsigned int req = STDMIN(remaining, BLOCK_SIZE); source.Pump(req); filter.Flush(false); processed += req; remaining -= req; if (processed % (1024*1024*10) == 0) cout << "Processed: " << meter.GetTotalBytes() << endl; } // Signal there is no more data to process. // The dtor's will do this automatically. filter.MessageEnd(); } catch(const Exception& ex) { cerr << ex.what() << endl; } return 0; }
The code above elides lword remaining = ...
. Its easy enough to determine when using a StringSource
or StringSource
because the size is readily at hand. You can use a FileSource
and file size with the following code:
inline lword FileSize(const FileSource& file) { std::istream* stream = const_cast<FileSource&>(file).GetStream(); std::ifstream::pos_type old = stream->tellg(); std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg(); stream->seekg(old); return static_cast<lword>(end); }
Or, you could just use the end-of-file as a predicate (the const_cast
is not needed after Commit ca9e788fbfada9c4).
inline bool EndOfFile(const FileSource& file) { std::istream* stream = const_cast<FileSource&>(file).GetStream(); return stream->eof(); }
Compression
The following sample compresses random data with Gzip. The data is read from a FileSource
, but StringSource
or other source could be used as well. Because the data is random, it should be uncompressible. You can read from /dev/zero
if you need something to compress.
To create the data before running the program use dd
:
$ dd if=/dev/urandom of=uncompress.bin bs=512 count=1048576 1048576+0 records in 1048576+0 records out 536870912 bytes (537 MB, 512 MiB) copied, 4.55431 s, 118 MB/s
The program is shown below, and its a slightly modified version of the one presented in Filter Framework.
inline lword FileSize(const FileSource& file) { std::istream* stream = const_cast<FileSource&>(file).GetStream(); std::ifstream::pos_type old = stream->tellg(); std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg(); stream->seekg(old); return static_cast<lword>(end); } int main(int argc, char* argv[]) { try { MeterFilter meter; Gzip filter; FileSource source("uncompress.bin", false); FileSink sink("compress.bin"); source.Attach(new Redirector(filter)); filter.Attach(new Redirector(meter)); meter.Attach(new Redirector(sink)); const size_t BLOCK_SIZE = 4096; lword remaining = FileSize(source); lword processed = 0; while(remaining && !source.SourceExhausted()) { unsigned int req = STDMIN(remaining, BLOCK_SIZE); source.Pump(req); filter.Flush(false); processed += req; remaining -= req; if (processed % (1024*1024*10) == 0) cout << "Processed: " << meter.GetTotalBytes() << endl; } // Signal there is no more data to process. // The dtor's will do this automatically. filter.MessageEnd(); } catch(const Exception& ex) { cerr << ex.what() << endl; } return 0; }
Running the program on a 6th generation iCore results in the following output:
$ time ./pump.exe Processed: 10498570 Processed: 20997130 Processed: 31495690 Processed: 41994250 ... Processed: 503930890 Processed: 514429450 Processed: 524928010 Processed: 535426570 real 0m5.837s user 0m2.549s sys 0m0.628s
And checking the file sizes results in:
$ ls -Al *.bin -rw-rw-r--. 1 user user 537526282 May 2 17:10 compress.bin -rw-rw-r--. 1 user user 536870912 May 2 17:10 uncompress.bin
Encryption
The next example performs encryption on a file. Its the same basic example as in Filter Framework and Compression, but it uses EndOfFile
rather than FileSize
. The sample data is called plain.bin
, and it can be created with:
$ dd if=/dev/zero of=./plain.bin bs=512 count=1048576 1048576+0 records in 1048576+0 records out 536870912 bytes (537 MB, 512 MiB) copied, 1.22499 s, 438 MB/s
The modified program is shown below. The const_cast
in EndOfFile
is not needed after Commit ca9e788fbfada9c4.
inline bool EndOfFile(const FileSource& file) { std::istream* stream = const_cast<FileSource&>(file).GetStream(); return stream->eof(); } int main(int argc, char* argv[]) { try { byte key[AES::DEFAULT_KEYLENGTH]={}, iv[AES::BLOCKSIZE]={}; CTR_Mode<AES>::Encryption encryptor; encryptor.SetKeyWithIV(key, sizeof(key), iv); MeterFilter meter; StreamTransformationFilter filter(encryptor); FileSource source("plain.bin", false); FileSink sink("cipher.bin"); source.Attach(new Redirector(filter)); filter.Attach(new Redirector(meter)); meter.Attach(new Redirector(sink)); const size_t BLOCK_SIZE = 4096; lword processed = 0; while(!EndOfFile(source) && !source.SourceExhausted()) { source.Pump(BLOCK_SIZE); filter.Flush(false); processed += BLOCK_SIZE; if (processed % (1024*1024*10) == 0) cout << "Processed: " << meter.GetTotalBytes() << endl; } // Signal there is no more data to process. // The dtor's will do this automatically. filter.MessageEnd(); } catch(const Exception& ex) { cerr << ex.what() << endl; } return 0; }
Running the program results in:
$ time ./pump.exe Processed: 10485760 Processed: 20971520 Processed: 31457280 Processed: 41943040 ... Processed: 503316480 Processed: 513802240 Processed: 524288000 Processed: 534773760 real 0m0.951s user 0m0.240s sys 0m0.710s
Finally, the respective file sizes:
$ ls -Al *.bin -rw-rw-r--. 1 user user 536870912 May 2 18:13 cipher.bin -rw-rw-r--. 1 user user 536870912 May 2 18:12 plain.bin
Skipping Data
Sometimes you need to skip data in the stream. Skip
is part of the Filter
interface, and it works on the output buffer and not the source buffer. More precisely, Skip
applies to the AttachedTransformation
, so you can't skip the input data without a little extra work.
To Skip
bytes on a Source
, use a NULL
AttachedTransformation
. Also see Skip'ing on a Source does not work as expected on Stack Overflow and Issue 248: Skip'ing on a Source does not work.
int main(int argc, char* argv[]) { string str1, str2; HexEncoder enc(new StringSink(str1)); for(unsigned int i=0; i < 32; i++) enc.Put((byte)i); enc.MessageEnd(); cout << "str1: " << str1 <<endl; // 'ss' has a NULL AttachedTransformation() StringSource ss(str1, false); ss.Pump(10); // Attach the real filter chain to 'ss' ss.Attach(new StringSink(str2)); ss.PumpAll(); cout << "str2: " << str2 << endl; return 0; }
If you have to skip in the middle of a stream you can swap-in TheBitBucket
.
int main(int argc, char* argv[]) { string str1, str2; HexEncoder enc(new StringSink(str1)); for(unsigned int i=0; i < 32; i++) enc.Put((byte)i); enc.MessageEnd(); cout << "str1: " << str1 <<endl; // Pump 10 bytes into the sink StringSource ss(str1, false, new StringSink(str2)); ss.Pump(10); cout << "str2: " << str2 <<endl; // Skip 4 bytes ss.Attach(NULLPTR); ss.Pump(4); // Pump remaining bytes to sink ss.Attach(new StringSink(str2)); ss.PumpAll(); cout << "str2: " << str2 << endl; return 0; }
The program above produces the following output.
$ ./test.exe str1: 000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F str2: 0001020304 str2: 00010203040708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F
MeterFilter
You can also use a MeterFilter to skip bytes in a pipeline. The MeterFilter
has a really cool feature that allows you to skip a range of bytes.
The code below creates a file with binary data. The binary data is just the values 0 to 255.
$ hexdump -C data.bin 00000000 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| 00000010 10 11 12 13 14 15 16 17 18 19 1a 1b 1c 1d 1e 1f |................| 00000020 20 21 22 23 24 25 26 27 28 29 2a 2b 2c 2d 2e 2f | !"#$%&'()*+,-./| 00000030 30 31 32 33 34 35 36 37 38 39 3a 3b 3c 3d 3e 3f |0123456789:;<=>?| 00000040 40 41 42 43 44 45 46 47 48 49 4a 4b 4c 4d 4e 4f |@ABCDEFGHIJKLMNO| 00000050 50 51 52 53 54 55 56 57 58 59 5a 5b 5c 5d 5e 5f |PQRSTUVWXYZ[\]^_| 00000060 60 61 62 63 64 65 66 67 68 69 6a 6b 6c 6d 6e 6f |`abcdefghijklmno| 00000070 70 71 72 73 74 75 76 77 78 79 7a 7b 7c 7d 7e 7f |pqrstuvwxyz{|}~.| 00000080 80 81 82 83 84 85 86 87 88 89 8a 8b 8c 8d 8e 8f |................| 00000090 90 91 92 93 94 95 96 97 98 99 9a 9b 9c 9d 9e 9f |................| 000000a0 a0 a1 a2 a3 a4 a5 a6 a7 a8 a9 aa ab ac ad ae af |................| 000000b0 b0 b1 b2 b3 b4 b5 b6 b7 b8 b9 ba bb bc bd be bf |................| 000000c0 c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 ca cb cc cd ce cf |................| 000000d0 d0 d1 d2 d3 d4 d5 d6 d7 d8 d9 da db dc dd de df |................| 000000e0 e0 e1 e2 e3 e4 e5 e6 e7 e8 e9 ea eb ec ed ee ef |................| 000000f0 f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 fa fb fc fd fe ff |................|
Then the code skips the first 100 bytes and the last 33 bytes. Bytes that are not skipped are processed by a HexEncoder and printed to stdout
. The Redirector stops ownership (and destruction) since the MeterFilter
is stack based. The MeterFilter
will be destroyed when the stack frame goes out of scope, so the FileSource
does not need to destroy it.
#include "cryptlib.h" #include "filters.h" #include "files.h" #include "hex.h" #include <iostream> #include <fstream> using namespace CryptoPP; inline lword FileSize(const FileSource& file) { std::istream* stream = const_cast<FileSource&>(file).GetStream(); std::ifstream::pos_type old = stream->tellg(); std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg(); stream->seekg(old); return static_cast<lword>(end); } int main(int argc, char* argv[]) { const std::string filename = "data.bin"; const size_t skip_lead = 100; const size_t skip_tail = 33; // Create a test file with some data std::ofstream output(filename.c_str(), std::ios::binary); for (size_t i=0; i<256; ++i) { char ch = (char)i; output.write(&ch, 1); } output.close(); // Create a FileSource, but do not pumpAll FileSource fs(filename.c_str(), false /*pumpAll*/); const lword fileSize = FileSize(fs); // Create a MeterFilter to skip bytes MeterFilter meter(new HexEncoder(new FileSink(std::cout))); meter.AddRangeToSkip(0, 0, skip_lead, false); meter.AddRangeToSkip(0, fileSize-skip_tail, skip_tail, true); // Process the stream fs.Attach(new Redirector(meter)); fs.PumpAll(); std::cout << std::endl; return 0; }
Running the program results in the following output.
$ ./pump_data.exe 6465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B 8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3 B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADB DCDDDE
Multiple Sources
Sometimes you need to use multiple sources to transform data. For example, you may need a MySource(s1+f1, ...)
, where s1
is a signature and f1
is file data, but f1
is too large to fit in memory. Also see How to combine two Sources into new one in Crypto++? on Stack Overflow.
To setup the example use the following code. It writes random ASCII printable characters to a string and a file.
void random_string(std::string& str, size_t len) { const char alphanum[] = "0123456789" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; const size_t size = sizeof(alphanum) - 1; str.reserve(len); for (int i = 0; i < len; ++i) str.push_back(alphanum[rand() % size]); } int main(int argc, char* argv[]) { // Deterministic for a machine std::srand(0); std::string s1, s2, r; const size_t size = 1024*16+1; random_string(s1, size); random_string(s2, size); // Write s2 to file StringSource(s2, true, new FileSink("test.dat")); ... }
Then, to use the multiple sources on the data perform the following.
StringSource ss1(s1, false); FileSource fs1("test.dat", false); HashFilter hf1(hash, new StringSink(r)); ss1.Attach(new Redirector(hf1)); ss1.Pump(LWORD_MAX); ss1.Detach(); fs1.Attach(new Redirector(hf1)); fs1.Pump(LWORD_MAX); fs1.Detach(); hf1.MessageEnd(); std::cout << "s1 + f1: "; hex.Put((const byte*)r.data(), r.size()); std::cout << std::endl;
There are several things going on in the code above. First, we dynamically attach and detach the hash filter chain to sources ss1
and fs1
.
Second, once the filter is attached we use Pump(LWORD_MAX)
to pump all the data from the source into the filter chain. We don't use PumpAll()
because PumpAll()
signals the end of the current message and generates a MessageEnd()
.
The reason we don't want a MessageEnd()
is, we are processing one message in multiple parts; we are not processing multiple messages. So we want only one MessageEnd()
when we determine.
Third, once we are done with the source, we call Detach
so StringSource
and FileSource
destructors don't cause a spurious MessageEnd()
message to enter the filter chain. Again, we are processing one message in multiple parts; we are not processing multiple messages. So we want only one MessageEnd()
when we determine.
Fourth, when we are done sending our data into the filter, we call hf.MessageEnd()
to tell the filter to process all pending or buffered data. This is when we want the MessageEnd()
call, and not before.
Fifth, we call Detach()
when done rather than Attach()
. Detach()
deletes the existing filter chain and avoids memory leaks. Attach()
attaches a new chain but does not delete the existing filter or chain. Since we are using a Redirector
our HashFilter
survives. The HashFilter
is eventually cleaned as an automatic stack variable.
As an aside, if ss1.PumpAll()
or fs1.PumpAll()
were used (or destructors allowed to send MessageEnd()
into the filter chain) then you would get a concatenation of Hash(s1)
and Hash(s2)
because it would look like two different messages to the filter instead of one message over two parts.
Downloads
No downloads.