Next Previous Up Contents
Next: Adapting Sequential to Random Access
Up: Table Data
Previous: Random Access

2.3.3 Parallel Processing

STIL version 4 introduces support for parallel processing of tabular data. This somewhat resembles, but is not based on, parts of the streams framework from Java 8. To perform a parallel operation on a StarTable, you must provide an instance of the class RowCollector, and pass it along with a target table to the collect method of a suitable RowRunner. The RowRunner instance determines whether execution is done sequentially or in parallel; usually RowRunner.DEFAULT is a suitable instance (if there are many rows and multiple cores are available it will run in parallel; if there are few rows or the hardware only provides a single core it will run sequentially). The RowRunner accesses the table data using the getRowSplittable method of the table in question; the RowSplittable thus obtained behaves a bit like a java.util.Spliterator in that it can be recursively divided up into smaller pieces amenable to parallel processing. Although all StarTables must implement the getRowSplittable method, actual splitting cannot always be implemented, so depending on the behaviour of the table in question, there is no guarantee that processing will actually be performed in parallel.

Here is an example of how to sum the contents of a column using (potentially) parallel processing:

    static double sumColumnParallel( StarTable table, int icol ) throws IOException {
        double[] acc = RowRunner.DEFAULT.collect( new SumCollector( icol ), table );
        return acc[ 0 ];
    }

    /**
     * RowCollector implementation that sums values from a single column,
     * using a 1-element double[] array to accumulate values into.
     */
    static class SumCollector extends RowCollector<double[]> {
        final int icol_;
        SumCollector( int icol ) {
            icol_ = icol;
        }
        public double[] createAccumulator() {
            return new double[ 1 ];
        }
        public double[] combine(double[] acc1, double[] acc2) {
            acc1[ 0 ] += acc2[ 0 ];
            return acc1;
        }
        public void accumulateRows( RowSplittable rseq, double[] acc ) throws IOException {
            while ( rseq.next() ) {
                Object value = rseq.getCell( icol_ );
                if ( value instanceof Number ) {
                    acc[ 0 ] += ((Number) value).doubleValue();
                }
            }
        }
    }

The level of parallelism available from the JVM is determined from the system property java.util.concurrent.ForkJoinPool.common.parallelism, which is normally set to one less than the number of processing cores on the current machine. Parallel processing can be inhibited by setting this value to 1.


Next Previous Up Contents
Next: Adapting Sequential to Random Access
Up: Table Data
Previous: Random Access

STIL - Starlink Tables Infrastructure Library
Starlink User Note252
STIL web page: http://www.starlink.ac.uk/stil/
Author email: m.b.taylor@bristol.ac.uk