Tag Archives: Global Deduplication

Pcompress 2.0 with Global Deduplication

The last few weeks I have been heads down busy with a multitude of things at work and in personal life with hardly any time for anything else. One of the biggest items that kept me busy during my spare times has of course been the release of Pcompress 2.0.

This release brings to fruition some of the hobby research work I had been doing around scalable deduplication of very large datasets. Pcompress 2.0 includes support for Global Deduplication which eliminates duplicate chunks across the entire dataset or file. Pcompress already had support for Data Deduplication but it removed duplicates only within a segment of the data. The larger the segment size, the more effective is the deduplication. This mode is very fast since there is no central index and no serialization. However dedupe effectiveness gets limited.

Global Deduplication introduces a central in-memory index for looking up chunk hashes. Data is first split into fixed-size or variable-length Rabin chunks as usual. Each 4KB (or larger) chunk of data has an associated 256-bit or larger cryptographic checksum (SHA256, BLAKE2 etc.). These hashes are looked up and inserted into a central hashtable. If a chunk hash entry is already present in the hashtable then the chunk is considered a duplicate and a reference to the existing chunk is inserted into the datastream. This is a simple full chunk index based exact deduplication approach which is very effective using 4KB chunk sizes. However there is a problem.

The size of a full chunk index grows rapidly with the dataset. If we are looking at 4KB chunks then we get 268435456 chunks for 1TB of data. Each chunk entry in the hashtable needs to have the 256-bit checksum, a 64-bit file offset and a 32-bit length value. So total size of the index entries is approax 11GB for unique data not considering the additional overheads of the hashtable structure. So if we consider hundreds of terabytes then the index is too big to fit in memory. In fact the index becomes so big that it becomes very costly to lookup chunk hashes slowing the dedupe process to a crawl. Virtually all commercial dedupe products do not even use 4KB chunks. The minimum is 8KB used in Data Domain with most other products using chunk sizes much larger than that. Larger chunk sizes reduce the index size but also reduce dedupe effectiveness.

One of the ways of scaling Data Deduplication to petascale is to look at similarity matching techniques that can determine regions of data that are approximately similar to each other and then compare their cryptographic chunk hashes to actually eliminate exact matching chunks. A variant of this technique uses Delta Differencing instead of hash matching to eliminate redundancy at the byte level. However I was interested in the former.

Pcompress 2.0 includes two approaches to Global Deduplication. If a simple full chunk index can fit into 75% of available free RAM then it is used directly. This is fast and most effective at eliminating duplicates. By default 4KB chunks are used and it gives good performance even with chunks this small. This is lower than what most other commercial or open-source dedupe products recommend or offer. Once file sizes start becoming larger and the index size overflows the memory limit then Pcompress automatically switches to Segmented Similarity Based Deduplication.

In Segmented Similarity mode data is split into 4KB (or larger) chunks as usual (Variable-length Rabin or Fixed-block). Then groups of 2048 chunks are collected to form a segment. With 4KB chunks this results in an average segment size of 8MB. The list of cryptographic chunk hashes for the segment are stored in a temporary segment file. Then these cryptographic chunks hashes are analysed to produce 25 similarity hashes. Each similarity hash is essentially a 64-bit CRC of a min-value entry. These hashes are then inserted or looked up in a central index. If another segment is found that matches at least one of the 25 similarity hashes then that segment is considered approximately similar to the current segment. It’s chunk hash list is then memory mapped into the process address space and exact crypto hash based chunk matching is done to perform the actual deduplication.

This approach results is an index size that is approximately 0.0023% of the dataset size. So Pcompress will require upto a 25GB index to deduplicate 1PB of data. That is assuming 100% random 1PB data with no duplicates. In practice the index will be smaller. This approach provides >90% dedupe efficiency of using a full chunk index while providing high scalability. Even though disk I/O is not completely avoided, it requires one disk write and only a few disk reads for every 2048 chunks. To balance performance and predictable behaviour, the write is synced to disk after every few segments. Using mmap(), instead of a read, helps performance and the disk offsets to be mmap-ed are sorted in ascending order to reduce random access to the segment chunk list file. This file is always written to at the end and extended but existing data is never modified. So it is ideal to place it on a Solid State drive to get a very good performance boost. Finally, access to the central index is coordinated by the threads cooperating using a set of semaphores allowing for lock-free access to critical sections. See: https://moinakg.wordpress.com/2013/03/26/coordinated-parallelism-using-semaphores/

I had been working out the details of this approach for quite a while now and Pcompress 2.0 contains the practical implementation of it. In addition to this Pcompress now includes two additional streaming modes. When compressing a file the output file can be specified as ‘-‘ to stream the compressed data to stdout. Decompression can take the input file as ‘-‘ to read compressed data from stdin.

Global Deduplication in Pcompress together with streaming modes and with help from utilities like Netcat or Ncat can be used to optimize network transfer of large datasets. Eventually I intend to implement proper WAN Optimization capabilities in a later release.

Related Research

  1. SiLo: A Similarity-Locality based Near-Exact Deduplication
  2. The Design of a Similarity Based Deduplication System
  3. Sparse Indexing: Large Scale, Inline Deduplication Using Sampling and Locality
  4. Similarity Based Deduplication with Small Data Chunks

Coordinated Parallelism using Semaphores

teamOne of the key features of my Pcompress utility is of course, parallelism. The ability to split and process data in parallel in multiple threads with all the threads doing virtually the same work. There is some limited variability depending on the nature of the work and the nature of the data. For example some data segments may not be compressible so they will have to be stored as-is. Whenever there are multiple threads there is typically some need for synchronization. There are of course scenarios where thread processing is completely independent and no synchronization is needed whatsoever. However in Pcompress that is not the case. There are a couple of cases where synchronization is needed:

  1. Ordering of input data segments. The data segments in the compressed file must be written in the same order as they were input otherwise data will be corrupt.
  2. With the Global Deduplication feature, access to a single chunk index must be serialized and ordered in the same sequence as they were input.

The second feature is a recent addition and also requires ordering since we want all duplicate chunk references to be backward references. That is in a data stream duplicate chunks point backwards to whole chunks at the head of the stream. So data segments containing the chunks must go through the index lookup in the same order as they were input. Rest of the processing like actual pre-compression stage, chunk splitting stage, compression stage, optional encryption stage and so on can work completely parallel without dependencies. The flow can be illustrated by the following diagram:

parallel_flow

As you can notice there are 3 points where some form of  synchronization is necessary. The input, Index Lookup for global dedupe and final writer stage. In addition data ordering as per input has to be maintained for index lookup and when writing the output data.

There are several ways of achieving this flow, the most common techniques are using a thread pool and some queues. Perhaps the simplest approach is to use barrier synchronization. We can put one barrier prior to the index lookup and another barrier prior to the writer. In each case a simple loop takes care of the serial processing maintaining the proper data ordering. However both the approaches have drawbacks. Using queues and thread pools have resource overheads for the data structures and locking.  Barriers are not strictly needed here and using barriers mean that some amount of potential concurrency is lost waiting at the barrier. The time spent waiting at the barrier is the time taken for the slowest or typically the last thread to complete processing. One of the intentions I had was to have as much overlapped processing as possible. if one thread is accessing the index and another thread does not need it, then, it should be allowed to proceed.

So I played around with POSIX semaphores. Using semaphores in a producer-consumer setup is a common approach. However Pcompress threads are a little more involved than simple producers and consumers. A bunch of semaphores are needed to signal and control the execution flow of the threads. After some experimentation I came up with the following approach.

A dispatcher thread reads data segments from the input file and schedules them to worker threads in a round robin fashion and the writer thread reads processed data segments from worker threads in a round robin loop as well. This preserves data ordering at input and output. The ordering of index lookup and dedupe processing is done by one thread signaling the other. The diagram below illustrates this showing an example with 2 threads.

parallel_flow

The green arrows in the diagram shows the direction of the semaphore signals. At each synchronization point a semaphore is signaled to indicate completion of some activity. The critical section of the index lookup and update operation is highlighted in blue. Each thread holds a reference to the index semaphore of the next thread in sequence. The last thread holds a reference of the index semaphore of the first thread. Each thread first waits for it’s own index semaphore to be signaled, then performs the index update and signals the next guy to proceed. The dispatcher thread signals the index semaphore of the first thread to start the ball rolling. Effectively this approach is equivalent to a round-robin token ring network. Whoever holds the token can access the common resource. Lock contention is completely avoided, so this can potentially scale to thousands of threads.

The key to data ordering are the two loops, one in the dispatcher and one in the writer thread. The dispatcher always assigns data segments to threads in a fixed order. In the above example Thread 1 gets all the odd segments and Thread 2 gets all the even ones. The writer thread also waits for threads in order eventually ensuring that data ordering is preserved.

Looking at all this in another way the synchronization approach can be viewed simplified as three concentric rings and the processing flows are a set of radii converging to the center of the circle and intersecting the rings. The processing flow direction is inwards towards the center and all the tokens flow along the rings in one direction, for example clockwise (black arrows). The green curved arrows show signaling of the synch points to forward tokens. That is when processing flow reaches the writer sink ring it forwards the token it received at the dedupe ring to the next flow. The final sync point at the centre completes the data write and forwards the token at the previous radius intersection point on the outermost ring. This approach ensures ordering and avoids races. To have maximum concurrency right from the beginning, all the synch points on the outermost ring get one-time-use tokens so all the initial processing can begin immediately. This is somewhat like priming a water pump.

parallel_flow_ring

This flow allows overlapped operations to happen concurrently. In addition the dispatcher does a simple double buffering by reading the next segment into a spare buffer after signaling the current thread to start processing. A bit of concurrency can be lost when the writer thread is waiting for thread 1 and thread 2 has already completed. That situation typically arises at the end of a file where the last segment can be a small one. It can also arise if one segment cannot be deduplicated and the rest of the dedupe processing is aborted. However the impact of these are relatively small compared to the overall processing being done, so a lot of multi-core parallelism is effectively utilized in practice. Finally a bunch of overheads in using specific data structures and/or parallel threading libraries are also avoided.