Friday, July 19, 2013

Streaming a Zip64 archive

One of my projects allowed the user to request a ZIP archive of files off the server, instead of downloading them individually. To avoid requiring (potentially) double the disk space, I wanted to stream the archive as I created it. Additionally, the total size of the uncompressed data could easily exceed 232 bytes, so I needed to use the Zip64 extensions.

In addition to providing the compression primitives, the excellent zlib package ships with a contributed ZIP generator minizip. Unfortunately, this generator uses seek to move back and forth within the generated file, so it wasn't suitable for my needs.

I eventually wrote my own utility to do this streaming. It took quite a few tries, as the spec is confusing and ambiguous. Once I figured out the spec, I wrote the code to always assume the worst (file size, number of files, etc); this costs a few extra bytes, but saves tons of logic.

The most interesting bits, at least regarding the ambiguous wording of the specification, are when we add a single file to the archive in Zip64Streamer::addFile:

bool
Zip64Streamer::addFile( const string & file )
{
    DEBUG( "af: adding file " << QS( file ) );

    FileInfo fi;
    fi.path = m_sDir + "/" + file;
    fi.name = file;
    fi.offset = m_offset;

    fillDateTime( fi );

    CharBuffer z64;
    write2( z64, Z64_EXTRA_FIELD_TAG );
    write2( z64, LENGTH_PLACEHOLDER );
    write8( z64, DEFER_UNCOMPRESSED_SIZE );
    write8( z64, DEFER_COMPRESSED_SIZE );
    fixupExtraFieldLength( z64 );

    CharBuffer unix;
    write2( unix, UNIX_EXTRA_FIELD_TAG );
    write2( unix, LENGTH_PLACEHOLDER );
    write4( unix, fi.stat_atime );
    write4( unix, fi.stat_mtime );
    write2( unix, UNIX_ZIP_UID );
    write2( unix, UNIX_ZIP_GID );
    fixupExtraFieldLength( unix );

    CharBuffer lh; // local header
    write4( lh, LOCAL_FILE_HEADER_SIG );
    write2( lh, VERSION_NEEDED_TO_EXTRACT_4_5 );
    write2( lh, GPB_DATA_DESC_FOLLOWS_DATA );
    write2( lh, COMPRESSION_METHOD_DEFLATE );
    write2( lh, fi.msdos_time );
    write2( lh, fi.msdos_date );
    write4( lh, DEFER_CRC32 );
    write4( lh, FORCE_Z64_COMPRESSED_SIZE );
    write4( lh, FORCE_Z64_UNCOMPRESSED_SIZE );
    write2( lh, static_cast< uint16_t >( fi.name.size() ) );
    write2( lh, static_cast< uint16_t >( z64.size() + unix.size() ) );

    FINE( "af: " << file << ": writing header" );

    emit( lh );
    emitCopy( fi.name );
    emit( z64 );
    emit( unix );

    emitCompressedData( fi );

    FINE( "af: " << file << ": writing descriptor" );
    CharBuffer dd; // data descriptor
    write4( dd, DATA_DESC_SIG );
    write4( dd, fi.crc32 );
    write8( dd, fi.compressed );
    write8( dd, fi.uncompressed );
    emit( dd );

    // save info for eventual use in central directory
    m_fileInfo.push_back( fi );

    return true;
}

After we've added all the files, we finalize the archive in the destructor:

Zip64Streamer::~Zip64Streamer()
{
    DEBUG( "dtor: finishing zip file" );

    // save start of central directory
    const uint64_t centralDirOffset( m_offset );

    // emit a central directory record for each file
    for ( const FileInfo & fi : m_fileInfo )
    {
        FINE( "dtor: adding central dir record for " << QS( fi.name ) );

        CharBuffer z64;
        write2( z64, Z64_EXTRA_FIELD_TAG );
        write2( z64, LENGTH_PLACEHOLDER );
        write8( z64, fi.uncompressed );
        write8( z64, fi.compressed );
        write8( z64, fi.offset );
        fixupExtraFieldLength( z64 );

        CharBuffer unix;
        write2( unix, UNIX_EXTRA_FIELD_TAG );
        write2( unix, LENGTH_PLACEHOLDER );
        write4( unix, fi.stat_atime );
        write4( unix, fi.stat_mtime );
        write2( unix, UNIX_ZIP_UID );
        write2( unix, UNIX_ZIP_GID );
        fixupExtraFieldLength( unix );

        CharBuffer cd;
        write4( cd, CDIR_FILE_HEADER_SIG );
        write2( cd, VERSION_CREATED_BY_4_5_UNIX );
        write2( cd, VERSION_NEEDED_TO_EXTRACT_4_5 );
        write2( cd, GPB_DATA_DESC_FOLLOWS_DATA );
        write2( cd, COMPRESSION_METHOD_DEFLATE );
        write2( cd, fi.msdos_time );
        write2( cd, fi.msdos_date );
        write4( cd, fi.crc32 );
        write4( cd, FORCE_Z64_COMPRESSED_SIZE );
        write4( cd, FORCE_Z64_UNCOMPRESSED_SIZE );
        write2( cd, static_cast< uint16_t >( fi.name.size() ) );
        write2( cd, static_cast< uint16_t >( z64.size() + unix.size() ) );
        write2( cd, ZERO_COMMENT_LENGTH );
        write2( cd, DISK_START_ZERO );
        write2( cd, ZERO_INTERNAL_FILE_ATTR );
        write4( cd, UNIX_EXTERNAL_FILE_ATTR );
        write4( cd, FORCE_Z64_OFFSET );

        emit( cd );
        emitCopy( fi.name );
        emit( z64 );
        emit( unix );
    }

    // how many bytes did that use?
    const uint64_t centralDirBytes( m_offset - centralDirOffset );

    // and where are we now?
    const uint64_t z64EndOfCentralDirLoc( m_offset );

    FINE( "dtor: central dir: "
          "bytes=" << centralDirBytes << ", "
          "offset=" << centralDirOffset );

    DEBUG( "dtor: adding z64 end of central directory record @ " << m_offset );
    CharBuffer z64;
    write4( z64, Z64_END_OF_CENTRAL_DIR_REC_SIG );
    write8( z64, LENGTH_PLACEHOLDER );
    write2( z64, VERSION_CREATED_BY_4_5_UNIX );
    write2( z64, VERSION_NEEDED_TO_EXTRACT_4_5 );
    write4( z64, DISK_NUMBER_ZERO );
    write4( z64, DISK_START_ZERO );
    write8( z64, m_fileInfo.size() ); // # dir entries on this disk
    write8( z64, m_fileInfo.size() ); // # dir entries total
    write8( z64, centralDirBytes );
    write8( z64, centralDirOffset );
    fixupRecordLength64( z64 );
    emit( z64 );

    DEBUG( "dtor: adding z64 end of central directory locator @ " << m_offset );
    CharBuffer loc;
    write4( loc, Z64_END_OF_CENTRAL_DIR_LOC_SIG );
    write4( loc, DISK_NUMBER_ZERO );
    write8( loc, z64EndOfCentralDirLoc );
    write4( loc, DISK_TOTAL_ONE );
    emit( loc );

    DEBUG( "dtor: adding end of central directory record @ " << m_offset);
    CharBuffer end;
    write4( end, END_OF_CENTRAL_DIR_SIG );
    write2( end, DISK_NUMBER_ZERO );
    write2( end, DISK_START_ZERO );
    write2( end, FORCE_Z64_LOCAL_ENTRIES );
    write2( end, FORCE_Z64_TOTAL_ENTRIES );
    write4( end, FORCE_Z64_CDIR_SIZE );
    write4( end, FORCE_Z64_CDIR_OFFSET );
    write2( end, ZERO_COMMENT_LENGTH );
    emit( end );

    DEBUG( "dtor: finalizing zlib" );
    deflateEnd( &m_zs );

    DEBUG( "dtor: done" );
}

The full implementation can be found on github at https://github.com/tkil/ajf-prog/tree/master/Zip64Streamer. Hopefully others will find it useful.

Happy hacking!

Wednesday, June 26, 2013

A simple, fast, unbounded, batched, thread-safe queue

In my current project, I needed to pass items from one thread to another. While this is a classic problem in multi-threaded programs, I wanted to handle items in batches, keep the critical sections as small as possible, and reduce dynamic memory allocation churn. Examples of this pattern came up in a watchdog implementation (where multiple threads queue "heartbeats" for the watchdog to consume), and in recording status (multiple threads provide reports at different frequences, and they are consumed to update the global status table).

The solution I settled on uses a mutex and two vectors (one for other threads to push new items onto, the other for the worker thread to consume). After I'm done with one batch, I simply swap the working vector with the queue. The only time that the worker can block another thread is during the swap; for C++ vectors, that's is guaranteed to be O(1).

(The enqueue can block if it has to reallocate, but that should be progressively rarer as the program runs. Since we never actually create new vectors -- we just swap them back and forth -- any time either vector increases capacity, that capacity is preserved. This can be bootstrapped by using reserve() during startup.)

#include <vector>
#include <boost/thread.hpp>

class Service
{

public:

    enum { INIT_QUEUE_SIZE = 100 };

    struct Item { ... };

    Service()
        : queue(),
          running( true ),
          worker( runWorker, this )
    {
        {
            Lock lock( mutex );
            queue.reserve( INIT_QUEUE_SIZE );
        }
    }

    ~Service()
    {
        {
            Lock lock( mutex );
            running = false;
        }
        worker.join();
    }

    void enqueue( const Item & item )
    {
        Lock lock( mutex );
        if ( running )
            queue.push_back( item );
    }

private:

    typedef std::vector< Item > ItemVec;
    typedef boost::mutex Mutex;
    typedef boost::unique_lock< Mutex > Lock;
    typedef boost::thread Thread;

    Mutex mutex;
    ItemVec queue;
    bool running;
    Thread worker;

    void runWorker()
    {
        ItemVec work.reserve( INIT_QUEUE_SIZE );
        bool done( false );
        while ( ! done )
        {
            {
                Lock lock( mutex );
                queue.swap( work );
                done = !running;
            }

            if ( work.empty() );
            {
                // sleep or wait on a condition variable
                continue;
            }

            // process elements in work

            work.clear();
        }
    }

};