~jgillis/apache-arrow-reading-writing

56fdafe55045b039553020c6a2f83d0e14fc4723 — Joris Gillis 4 years ago d2b196f
Single column filter & ByteString

Adds an application to use even fewer columns of the dataset
Use ByteString from okio library to work with byte[] representation of String
M pom.xml => pom.xml +6 -0
@@ 36,6 36,12 @@
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>

        <dependency>
            <groupId>com.squareup.okio</groupId>
            <artifactId>okio</artifactId>
            <version>1.15.0</version>
        </dependency>
    </dependencies>

    <build>

A src/main/java/org/example/arrow/io/FilterSingleColumnApplication.java => src/main/java/org/example/arrow/io/FilterSingleColumnApplication.java +127 -0
@@ 0,0 1,127 @@
package org.example.arrow.io;

import it.unimi.dsi.fastutil.ints.IntArrayList;
import okio.ByteString;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

/**
 * Small application to read chunked data from people.arrow file, and do some analytics on them:
 * - filter people living in a street ending with 'way'
 * - group by city
 * - aggregate average age
 */
public class FilterSingleColumnApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(FilterSingleColumnApplication.class);

    /**
     * Main method: reading batches, filtering and aggregating.
     *
     * @throws IOException If reading from Arrow file fails
     */
    private void doAnalytics() throws IOException {
        RootAllocator allocator = new RootAllocator();

        try (FileInputStream fd = new FileInputStream("people.arrow")) {
            // Setup file reader
            ArrowFileReader fileReader = new ArrowFileReader(new SeekableReadChannel(fd.getChannel()), allocator);
            fileReader.initialize();
            VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot();

            // Aggregate: Using ByteString as it is faster than creating a String from a byte[]
            Map<ByteString, Long> perCityCount = new TreeMap<>();
            Map<ByteString, Long> perCitySum = new TreeMap<>();
            processBatches(fileReader, schemaRoot, perCityCount, perCitySum);

            // Print results
            for (ByteString city : perCityCount.keySet()) {
                double average = (double) perCitySum.get(city) / perCityCount.get(city);
                LOGGER.info("City = {}; Average = {}", city, average);
            }
        }
    }

    /**
     * Read batches, apply filters and write aggregation values into aggregation data structures
     *
     * @param fileReader   Reads batches from Arrow file
     * @param schemaRoot   Schema root for read batches
     * @param perCityCount Aggregation of count per city
     * @param perCitySum   Aggregation of summed value per city
     * @throws IOException If reading the arrow file goes wrong
     */
    private void processBatches(ArrowFileReader fileReader,
                                VectorSchemaRoot schemaRoot,
                                Map<ByteString, Long> perCityCount,
                                Map<ByteString, Long> perCitySum) throws IOException {
        // Reading the data, one batch at a time
        while (fileReader.loadNextBatch()) {
            int[] selectedIndexes = filterOnStreet(schemaRoot).elements();

            aggregate(schemaRoot, selectedIndexes, perCityCount, perCitySum);
        }
    }

    /**
     * Given the selected indexes, it copies the aggregation values into aggregation vectors
     *
     * @param schemaRoot      Schema root of batch
     * @param selectedIndexes Indexes to aggregate
     * @param perCityCount    Aggregating counts per city
     * @param perCitySum      Aggregating sums per city
     */
    private void aggregate(VectorSchemaRoot schemaRoot,
                           int[] selectedIndexes,
                           Map<ByteString, Long> perCityCount,
                           Map<ByteString, Long> perCitySum) {
        VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city");
        UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age");

        for (int selectedIndex : selectedIndexes) {
            ByteString city = ByteString.of(cityVector.get(selectedIndex));
            perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
            perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex));
        }
    }

    // Keep street ending in 'way'
    private IntArrayList filterOnStreet(VectorSchemaRoot schemaRoot) {
        StructVector addressVector = (StructVector) schemaRoot.getVector("address");
        VarCharVector streetVector = (VarCharVector) addressVector.getChild("street");

        IntArrayList streetSelectedIndexes = new IntArrayList();
        byte[] suffixInBytes = "way".getBytes();
        for (int i = 0; i < schemaRoot.getRowCount(); i++) {
            if (ByteString.of(streetVector.get(i)).endsWith(suffixInBytes)) {
                streetSelectedIndexes.add(i);
            }
        }
        streetSelectedIndexes.trim();
        return streetSelectedIndexes;
    }

    //========================================================================
    // Starting computation
    public static void main(String[] args) throws Exception {
        FilterSingleColumnApplication app = new FilterSingleColumnApplication();

        StopWatch stopWatch = StopWatch.createStarted();
        app.doAnalytics();
        stopWatch.stop();
        LOGGER.info("Timing: {}", stopWatch);
    }
}

M src/main/java/org/example/arrow/io/GenerateRandomDataApplication.java => src/main/java/org/example/arrow/io/GenerateRandomDataApplication.java +2 -2
@@ 15,7 15,7 @@ import java.util.List;
public class GenerateRandomDataApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(GenerateRandomDataApplication.class);
    private static final int CHUNK_SIZE = 1000;
    private static final int CHUNK_SIZE = 20_000;

    /**
     * Generates an array of random people.


@@ 77,7 77,7 @@ public class GenerateRandomDataApplication {

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        int numberOfPeople = 1_047_031;
        int numberOfPeople = 10_047_031;
        LOGGER.info("Generating {} people", numberOfPeople);
        Person[] people = app.randomPeople(numberOfPeople);
        stopWatch.split();

M src/main/java/org/example/arrow/io/ProcessDataApplication.java => src/main/java/org/example/arrow/io/ProcessDataApplication.java +17 -23
@@ 2,8 2,8 @@ package org.example.arrow.io;

import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import okio.ByteString;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;


@@ 18,7 18,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
 * Small application to read chunked data from people.arrow file, and do some analytics on them:


@@ 46,13 45,13 @@ public class ProcessDataApplication {
            fileReader.initialize();
            VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot();

            // Aggregate
            Map<String, Long> perCityCount = new HashMap<>();
            Map<String, Long> perCitySum = new HashMap<>();
            // Aggregate: Using ByteString as it is faster than creating a String from a byte[]
            Map<ByteString, Long> perCityCount = new HashMap<>();
            Map<ByteString, Long> perCitySum = new HashMap<>();
            processBatches(fileReader, schemaRoot, perCityCount, perCitySum);

            // Print results
            for (String city : perCityCount.keySet()) {
            for (ByteString city : perCityCount.keySet()) {
                double average = (double) perCitySum.get(city) / perCityCount.get(city);
                LOGGER.info("City = {}; Average = {}", city, average);
            }


@@ 70,8 69,8 @@ public class ProcessDataApplication {
     */
    private void processBatches(ArrowFileReader fileReader,
                                VectorSchemaRoot schemaRoot,
                                Map<String, Long> perCityCount,
                                Map<String, Long> perCitySum) throws IOException {
                                Map<ByteString, Long> perCityCount,
                                Map<ByteString, Long> perCitySum) throws IOException {
        // Reading the data, one batch at a time
        while (fileReader.loadNextBatch()) {
            IntArrayList lastNameSelectedIndexes = filterOnLastName(schemaRoot);


@@ 96,13 95,13 @@ public class ProcessDataApplication {
     */
    private void aggregate(VectorSchemaRoot schemaRoot,
                           int[] selectedIndexes,
                           Map<String, Long> perCityCount,
                           Map<String, Long> perCitySum) {
                           Map<ByteString, Long> perCityCount,
                           Map<ByteString, Long> perCitySum) {
        VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city");
        UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age");

        for (int selectedIndex : selectedIndexes) {
            String city = new String(cityVector.get(selectedIndex));
            ByteString city = ByteString.of(cityVector.get(selectedIndex));
            perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
            perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex));
        }


@@ 112,8 111,9 @@ public class ProcessDataApplication {
    private IntArrayList filterOnLastName(VectorSchemaRoot schemaRoot) {
        VarCharVector lastName = (VarCharVector) schemaRoot.getVector("lastName");
        IntArrayList lastNameSelectedIndexes = new IntArrayList();
        byte[] prefixBytes = "P".getBytes();
        for (int i = 0; i < schemaRoot.getRowCount(); i++) {
            if (new String(lastName.get(i)).startsWith("P")) {
            if (ByteString.of(lastName.get(i)).startsWith(prefixBytes)) {
                lastNameSelectedIndexes.add(i);
            }
        }


@@ 137,19 137,13 @@ public class ProcessDataApplication {

    // Keep street ending in 'way'
    private IntArrayList filterOnStreet(VectorSchemaRoot schemaRoot) {
        Optional<FieldVector> optionalStreet = schemaRoot.getVector("address")
                .getChildrenFromFields()
                .stream()
                .filter(fieldVector -> "street".equals(fieldVector.getField().getName()))
                .findFirst();

        if (!optionalStreet.isPresent()) {
            throw new IllegalArgumentException("Could not find street vector");
        }
        VarCharVector street = (VarCharVector) optionalStreet.get();
        StructVector addressVector = (StructVector) schemaRoot.getVector("address");
        VarCharVector streetVector = (VarCharVector) addressVector.getChild("street");

        IntArrayList streetSelectedIndexes = new IntArrayList();
        byte[] suffixBytes = "way".getBytes();
        for (int i = 0; i < schemaRoot.getRowCount(); i++) {
            if (new String(street.get(i)).endsWith("way")) {
            if (ByteString.of(streetVector.get(i)).endsWith(suffixBytes)) {
                streetSelectedIndexes.add(i);
            }
        }