zlib/examples/pigz.c
2011-09-09 23:26:40 -07:00

453 lines
16 KiB
C

/* pigz.c -- parallel implementation of gzip
* Copyright (C) 2007 Mark Adler
* Version 1.1 28 January 2007 Mark Adler
*/
/* Version history:
1.0 17 Jan 2007 First version
1.1 28 Jan 2007 Avoid void * arithmetic (some compilers don't get that)
Add note about requiring zlib 1.2.3
Allow compression level 0 (no compression)
Completely rewrite parallelism -- add a write thread
Use deflateSetDictionary() to make use of history
Tune argument defaults to best performance on four cores
*/
/*
pigz compresses from stdin to stdout using threads to make use of multiple
processors and cores. The input is broken up into 128 KB chunks, and each
is compressed separately. The CRC for each chunk is also calculated
separately. The compressed chunks are written in order to the output,
and the overall CRC is calculated from the CRC's of the chunks.
The compressed data format generated is the gzip format using the deflate
compression method. First a gzip header is written, followed by raw deflate
partial streams. They are partial, in that they do not have a terminating
block. At the end, the deflate stream is terminated with a final empty
static block, and lastly a gzip trailer is written with the CRC and the
number of input bytes.
Each raw deflate partial stream is terminated by an empty stored block
(using the Z_SYNC_FLUSH option of zlib), in order to end that partial
bit stream at a byte boundary. That allows the partial streams to be
concantenated simply as sequences of bytes. This adds a very small four
or five byte overhead to the output for each input chunk.
zlib's crc32_combine() routine allows the calcuation of the CRC of the
entire input using the independent CRC's of the chunks. pigz requires zlib
version 1.2.3 or later, since that is the first version that provides the
crc32_combine() function.
pigz uses the POSIX pthread library for thread control and communication.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include "zlib.h"
#define local static
/* exit with error */
local void bail(char *msg)
{
fprintf(stderr, "pigz abort: %s\n", msg);
exit(1);
}
/* read up to len bytes into buf, repeating read() calls as needed */
local size_t readn(int desc, unsigned char *buf, size_t len)
{
ssize_t ret;
size_t got;
got = 0;
while (len) {
ret = read(desc, buf, len);
if (ret < 0)
bail("read error");
if (ret == 0)
break;
buf += ret;
len -= ret;
got += ret;
}
return got;
}
/* write len bytes, repeating write() calls as needed */
local void writen(int desc, unsigned char *buf, size_t len)
{
ssize_t ret;
while (len) {
ret = write(desc, buf, len);
if (ret < 1)
bail("write error");
buf += ret;
len -= ret;
}
}
/* a flag variable for communication between two threads */
struct flag {
int value; /* value of flag */
pthread_mutex_t lock; /* lock for checking and changing flag */
pthread_cond_t cond; /* condition for signaling on flag change */
};
/* initialize a flag for use, starting with value val */
local void flag_init(struct flag *me, int val)
{
me->value = val;
pthread_mutex_init(&(me->lock), NULL);
pthread_cond_init(&(me->cond), NULL);
}
/* set the flag to val, signal another process that may be waiting for it */
local void flag_set(struct flag *me, int val)
{
pthread_mutex_lock(&(me->lock));
me->value = val;
pthread_cond_signal(&(me->cond));
pthread_mutex_unlock(&(me->lock));
}
/* if it isn't already, wait for some other thread to set the flag to val */
local void flag_wait(struct flag *me, int val)
{
pthread_mutex_lock(&(me->lock));
while (me->value != val)
pthread_cond_wait(&(me->cond), &(me->lock));
pthread_mutex_unlock(&(me->lock));
}
/* if flag is equal to val, wait for some other thread to change it */
local void flag_wait_not(struct flag *me, int val)
{
pthread_mutex_lock(&(me->lock));
while (me->value == val)
pthread_cond_wait(&(me->cond), &(me->lock));
pthread_mutex_unlock(&(me->lock));
}
/* clean up the flag when done with it */
local void flag_done(struct flag *me)
{
pthread_cond_destroy(&(me->cond));
pthread_mutex_destroy(&(me->lock));
}
/* a unit of work to feed to compress_thread() -- it is assumed that the out
buffer is large enough to hold the maximum size len bytes could deflate to,
plus five bytes for the final sync marker */
struct work {
size_t len; /* length of input */
unsigned long crc; /* crc of input */
unsigned char *buf; /* input */
unsigned char *out; /* space for output (guaranteed big enough) */
z_stream strm; /* pre-initialized z_stream */
struct flag busy; /* busy flag indicating work unit in use */
pthread_t comp; /* this compression thread */
};
/* busy flag values */
#define IDLE 0 /* compress and writing done -- can start compress */
#define COMP 1 /* compress -- input and output buffers in use */
#define WRITE 2 /* compress done, writing output -- can read input */
/* read-only globals (set by main/read thread before others started) */
local int ind; /* input file descriptor */
local int outd; /* output file descriptor */
local int level; /* compression level */
local int procs; /* number of compression threads (>= 2) */
local size_t size; /* uncompressed input size per thread (>= 32K) */
local struct work *jobs; /* work units: jobs[0..procs-1] */
/* next and previous jobs[] indices */
#define NEXT(n) ((n) == procs - 1 ? 0 : (n) + 1)
#define PREV(n) ((n) == 0 ? procs - 1 : (n) - 1)
/* sliding dictionary size for deflate */
#define DICT 32768U
/* largest power of 2 that fits in an unsigned int -- used to limit requests
to zlib functions that use unsigned int lengths */
#define MAX ((((unsigned)-1) >> 1) + 1)
/* compress thread: compress the input in the provided work unit and compute
its crc -- assume that the amount of space at job->out is guaranteed to be
enough for the compressed output, as determined by the maximum expansion
of deflate compression -- use the input in the previous work unit (if there
is one) to set the deflate dictionary for better compression */
local void *compress_thread(void *arg)
{
size_t len; /* input length for this work unit */
unsigned long crc; /* crc of input data */
struct work *prev; /* previous work unit */
struct work *job = arg; /* work unit for this thread */
z_stream *strm = &(job->strm); /* zlib stream for this work unit */
/* reset state for a new compressed stream */
(void)deflateReset(strm);
/* initialize input, output, and crc */
strm->next_in = job->buf;
strm->next_out = job->out;
len = job->len;
crc = crc32(0L, Z_NULL, 0);
/* set dictionary if this isn't the first work unit, and if we will be
compressing something (the read thread assures that the dictionary
data in the previous work unit is still there) */
prev = jobs + PREV(job - jobs);
if (prev->buf != NULL && len != 0)
deflateSetDictionary(strm, prev->buf + (size - DICT), DICT);
/* run MAX-sized amounts of input through deflate and crc32 -- this loop
is needed for those cases where the integer type is smaller than the
size_t type, or when len is close to the limit of the size_t type */
while (len > MAX) {
strm->avail_in = MAX;
strm->avail_out = (unsigned)-1;
crc = crc32(crc, strm->next_in, strm->avail_in);
(void)deflate(strm, Z_NO_FLUSH);
len -= MAX;
}
/* run last piece through deflate and crc32, follow with a sync marker */
if (len) {
strm->avail_in = len;
strm->avail_out = (unsigned)-1;
crc = crc32(crc, strm->next_in, strm->avail_in);
(void)deflate(strm, Z_SYNC_FLUSH);
}
/* don't need to Z_FINISH, since we'd delete the last two bytes anyway */
/* return result */
job->crc = crc;
return NULL;
}
/* put a 4-byte integer into a byte array in LSB order */
#define PUT4(a,b) (*(a)=(b),(a)[1]=(b)>>8,(a)[2]=(b)>>16,(a)[3]=(b)>>24)
/* write thread: wait for compression threads to complete, write output in
order, also write gzip header and trailer around the compressed data */
local void *write_thread(void *arg)
{
int n; /* compress thread index */
size_t len; /* length of input processed */
unsigned long tot; /* total uncompressed size (overflow ok) */
unsigned long crc; /* CRC-32 of uncompressed data */
unsigned char wrap[10]; /* gzip header or trailer */
/* write simple gzip header */
memcpy(wrap, "\037\213\10\0\0\0\0\0\0\3", 10);
wrap[8] = level == 9 ? 2 : (level == 1 ? 4 : 0);
writen(outd, wrap, 10);
/* process output of compress threads until end of input */
tot = 0;
crc = crc32(0L, Z_NULL, 0);
n = 0;
do {
/* wait for compress thread to start, then wait to complete */
flag_wait(&(jobs[n].busy), COMP);
pthread_join(jobs[n].comp, NULL);
/* now that compress is done, allow read thread to use input buffer */
flag_set(&(jobs[n].busy), WRITE);
/* write compressed data and update length and crc */
writen(outd, jobs[n].out, jobs[n].strm.next_out - jobs[n].out);
len = jobs[n].len;
tot += len;
crc = crc32_combine(crc, jobs[n].crc, len);
/* release this work unit and go to the next work unit */
flag_set(&(jobs[n].busy), IDLE);
n = NEXT(n);
/* an input buffer less than size in length indicates end of input */
} while (len == size);
/* write final static block and gzip trailer (crc and len mod 2^32) */
wrap[0] = 3; wrap[1] = 0;
PUT4(wrap + 2, crc);
PUT4(wrap + 6, tot);
writen(outd, wrap, 10);
return NULL;
}
/* one-time initialization of a work unit -- this is where we set the deflate
compression level and request raw deflate, and also where we set the size
of the output buffer to guarantee enough space for a worst-case deflate
ending with a Z_SYNC_FLUSH */
local void job_init(struct work *job)
{
int ret; /* deflateInit2() return value */
job->buf = malloc(size);
job->out = malloc(size + (size >> 11) + 10);
job->strm.zfree = Z_NULL;
job->strm.zalloc = Z_NULL;
job->strm.opaque = Z_NULL;
ret = deflateInit2(&(job->strm), level, Z_DEFLATED, -15, 8,
Z_DEFAULT_STRATEGY);
if (job->buf == NULL || job->out == NULL || ret != Z_OK)
bail("not enough memory");
}
/* compress ind to outd in the gzip format, using multiple threads for the
compression and crc calculation and another thread for writing the output --
the read thread is the main thread */
local void read_thread(void)
{
int n; /* general index */
size_t got; /* amount read */
pthread_attr_t attr; /* thread attributes (left at defaults) */
pthread_t write; /* write thread */
/* set defaults (not all pthread implementations default to joinable) */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
/* allocate and set up work list (individual work units will be initialized
as needed, in case the input is short), assure that allocation size
arithmetic does not overflow */
if (size + (size >> 11) + 10 < (size >> 11) + 10 ||
(ssize_t)(size + (size >> 11) + 10) < 0 ||
((size_t)0 - 1) / procs <= sizeof(struct work) ||
(jobs = malloc(procs * sizeof(struct work))) == NULL)
bail("not enough memory");
for (n = 0; n < procs; n++) {
jobs[n].buf = NULL;
flag_init(&(jobs[n].busy), IDLE);
}
/* start write thread */
pthread_create(&write, &attr, write_thread, NULL);
/* read from input and start compress threads (write thread will pick up
the output of the compress threads) */
n = 0;
do {
/* initialize this work unit if it's the first time it's used */
if (jobs[n].buf == NULL)
job_init(jobs + n);
/* read input data, but wait for last compress on this work unit to be
done, and wait for the dictionary to be used by the last compress on
the next work unit */
flag_wait_not(&(jobs[n].busy), COMP);
flag_wait_not(&(jobs[NEXT(n)].busy), COMP);
got = readn(ind, jobs[n].buf, size);
/* start compress thread, but wait for write to be done first */
flag_wait(&(jobs[n].busy), IDLE);
jobs[n].len = got;
pthread_create(&(jobs[n].comp), &attr, compress_thread, jobs + n);
/* mark work unit so write thread knows compress was started */
flag_set(&(jobs[n].busy), COMP);
/* go to the next work unit */
n = NEXT(n);
/* do until end of input, indicated by a read less than size */
} while (got == size);
/* wait for the write thread to complete -- the write thread will join with
all of the compress threads, so this waits for all of the threads to
complete */
pthread_join(write, NULL);
/* free up all requested resources and return */
for (n = procs - 1; n >= 0; n--) {
flag_done(&(jobs[n].busy));
(void)deflateEnd(&(jobs[n].strm));
free(jobs[n].out);
free(jobs[n].buf);
}
free(jobs);
pthread_attr_destroy(&attr);
}
/* Process arguments for level, size, and procs, compress from stdin to
stdout in the gzip format. Note that procs must be at least two in
order to provide a dictionary in one work unit for the other work
unit, and that size must be at least 32K to store a full dictionary. */
int main(int argc, char **argv)
{
int n; /* general index */
int get; /* command line parameters to get */
char *arg; /* command line argument */
/* set defaults -- 32 processes and 128K buffers was found to provide
good utilization of four cores (about 97%) and balanced the overall
execution time impact of more threads against more dictionary
processing for a fixed amount of memory -- the memory usage for these
settings and full use of all work units (at least 4 MB of input) is
16.2 MB
*/
level = Z_DEFAULT_COMPRESSION;
procs = 32;
size = 131072UL;
/* process command-line arguments */
get = 0;
for (n = 1; n < argc; n++) {
arg = argv[n];
if (*arg == '-') {
while (*++arg)
if (*arg >= '0' && *arg <= '9') /* compression level */
level = *arg - '0';
else if (*arg == 'b') /* chunk size in K */
get |= 1;
else if (*arg == 'p') /* number of processes */
get |= 2;
else if (*arg == 'h') { /* help */
fputs("usage: pigz [-0..9] [-b blocksizeinK]", stderr);
fputs(" [-p processes] < foo > foo.gz\n", stderr);
return 0;
}
else
bail("invalid option");
}
else if (get & 1) {
if (get & 2)
bail("you need to separate the -b and -p options");
size = (size_t)(atol(arg)) << 10; /* chunk size */
if (size < DICT)
bail("invalid option");
get = 0;
}
else if (get & 2) {
procs = atoi(arg); /* processes */
if (procs < 2)
bail("invalid option");
get = 0;
}
else
bail("invalid option (you need to pipe input and output)");
}
if (get)
bail("missing option argument");
/* do parallel compression from stdin to stdout (the read thread starts up
the write thread and the compression threads, and they all join before
the read thread returns) */
ind = 0;
outd = 1;
read_thread();
/* done */
return 0;
}