From 56fdafe55045b039553020c6a2f83d0e14fc4723 Mon Sep 17 00:00:00 2001 From: Joris Gillis Date: Tue, 21 Apr 2020 10:01:23 +0200 Subject: [PATCH] Single column filter & ByteString Adds an application to use even fewer columns of the dataset Use ByteString from okio library to work with byte[] representation of String --- pom.xml | 6 + .../io/FilterSingleColumnApplication.java | 127 ++++++++++++++++++ .../io/GenerateRandomDataApplication.java | 4 +- .../arrow/io/ProcessDataApplication.java | 40 +++--- 4 files changed, 152 insertions(+), 25 deletions(-) create mode 100644 src/main/java/org/example/arrow/io/FilterSingleColumnApplication.java diff --git a/pom.xml b/pom.xml index 36fe432..604d5fe 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,12 @@ commons-lang3 3.9 + + + com.squareup.okio + okio + 1.15.0 + diff --git a/src/main/java/org/example/arrow/io/FilterSingleColumnApplication.java b/src/main/java/org/example/arrow/io/FilterSingleColumnApplication.java new file mode 100644 index 0000000..a69ff60 --- /dev/null +++ b/src/main/java/org/example/arrow/io/FilterSingleColumnApplication.java @@ -0,0 +1,127 @@ +package org.example.arrow.io; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import okio.ByteString; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.SeekableReadChannel; +import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +/** + * Small application to read chunked data from people.arrow file, and do some analytics on them: + * - filter people living in a street ending with 'way' + * - group by city + * - aggregate average age + */ +public class FilterSingleColumnApplication { + + private static final Logger LOGGER = LoggerFactory.getLogger(FilterSingleColumnApplication.class); + + /** + * Main method: reading batches, filtering and aggregating. + * + * @throws IOException If reading from Arrow file fails + */ + private void doAnalytics() throws IOException { + RootAllocator allocator = new RootAllocator(); + + try (FileInputStream fd = new FileInputStream("people.arrow")) { + // Setup file reader + ArrowFileReader fileReader = new ArrowFileReader(new SeekableReadChannel(fd.getChannel()), allocator); + fileReader.initialize(); + VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot(); + + // Aggregate: Using ByteString as it is faster than creating a String from a byte[] + Map perCityCount = new TreeMap<>(); + Map perCitySum = new TreeMap<>(); + processBatches(fileReader, schemaRoot, perCityCount, perCitySum); + + // Print results + for (ByteString city : perCityCount.keySet()) { + double average = (double) perCitySum.get(city) / perCityCount.get(city); + LOGGER.info("City = {}; Average = {}", city, average); + } + } + } + + /** + * Read batches, apply filters and write aggregation values into aggregation data structures + * + * @param fileReader Reads batches from Arrow file + * @param schemaRoot Schema root for read batches + * @param perCityCount Aggregation of count per city + * @param perCitySum Aggregation of summed value per city + * @throws IOException If reading the arrow file goes wrong + */ + private void processBatches(ArrowFileReader fileReader, + VectorSchemaRoot schemaRoot, + Map perCityCount, + Map perCitySum) throws IOException { + // Reading the data, one batch at a time + while (fileReader.loadNextBatch()) { + int[] selectedIndexes = filterOnStreet(schemaRoot).elements(); + + aggregate(schemaRoot, selectedIndexes, perCityCount, perCitySum); + } + } + + /** + * Given the selected indexes, it copies the aggregation values into aggregation vectors + * + * @param schemaRoot Schema root of batch + * @param selectedIndexes Indexes to aggregate + * @param perCityCount Aggregating counts per city + * @param perCitySum Aggregating sums per city + */ + private void aggregate(VectorSchemaRoot schemaRoot, + int[] selectedIndexes, + Map perCityCount, + Map perCitySum) { + VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city"); + UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age"); + + for (int selectedIndex : selectedIndexes) { + ByteString city = ByteString.of(cityVector.get(selectedIndex)); + perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1); + perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex)); + } + } + + // Keep street ending in 'way' + private IntArrayList filterOnStreet(VectorSchemaRoot schemaRoot) { + StructVector addressVector = (StructVector) schemaRoot.getVector("address"); + VarCharVector streetVector = (VarCharVector) addressVector.getChild("street"); + + IntArrayList streetSelectedIndexes = new IntArrayList(); + byte[] suffixInBytes = "way".getBytes(); + for (int i = 0; i < schemaRoot.getRowCount(); i++) { + if (ByteString.of(streetVector.get(i)).endsWith(suffixInBytes)) { + streetSelectedIndexes.add(i); + } + } + streetSelectedIndexes.trim(); + return streetSelectedIndexes; + } + + //======================================================================== + // Starting computation + public static void main(String[] args) throws Exception { + FilterSingleColumnApplication app = new FilterSingleColumnApplication(); + + StopWatch stopWatch = StopWatch.createStarted(); + app.doAnalytics(); + stopWatch.stop(); + LOGGER.info("Timing: {}", stopWatch); + } +} diff --git a/src/main/java/org/example/arrow/io/GenerateRandomDataApplication.java b/src/main/java/org/example/arrow/io/GenerateRandomDataApplication.java index e266f85..4e0dc55 100644 --- a/src/main/java/org/example/arrow/io/GenerateRandomDataApplication.java +++ b/src/main/java/org/example/arrow/io/GenerateRandomDataApplication.java @@ -15,7 +15,7 @@ import java.util.List; public class GenerateRandomDataApplication { private static final Logger LOGGER = LoggerFactory.getLogger(GenerateRandomDataApplication.class); - private static final int CHUNK_SIZE = 1000; + private static final int CHUNK_SIZE = 20_000; /** * Generates an array of random people. @@ -77,7 +77,7 @@ public class GenerateRandomDataApplication { StopWatch stopWatch = new StopWatch(); stopWatch.start(); - int numberOfPeople = 1_047_031; + int numberOfPeople = 10_047_031; LOGGER.info("Generating {} people", numberOfPeople); Person[] people = app.randomPeople(numberOfPeople); stopWatch.split(); diff --git a/src/main/java/org/example/arrow/io/ProcessDataApplication.java b/src/main/java/org/example/arrow/io/ProcessDataApplication.java index 3f4f98f..475717c 100644 --- a/src/main/java/org/example/arrow/io/ProcessDataApplication.java +++ b/src/main/java/org/example/arrow/io/ProcessDataApplication.java @@ -2,8 +2,8 @@ package org.example.arrow.io; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; +import okio.ByteString; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -18,7 +18,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Optional; /** * Small application to read chunked data from people.arrow file, and do some analytics on them: @@ -46,13 +45,13 @@ public class ProcessDataApplication { fileReader.initialize(); VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot(); - // Aggregate - Map perCityCount = new HashMap<>(); - Map perCitySum = new HashMap<>(); + // Aggregate: Using ByteString as it is faster than creating a String from a byte[] + Map perCityCount = new HashMap<>(); + Map perCitySum = new HashMap<>(); processBatches(fileReader, schemaRoot, perCityCount, perCitySum); // Print results - for (String city : perCityCount.keySet()) { + for (ByteString city : perCityCount.keySet()) { double average = (double) perCitySum.get(city) / perCityCount.get(city); LOGGER.info("City = {}; Average = {}", city, average); } @@ -70,8 +69,8 @@ public class ProcessDataApplication { */ private void processBatches(ArrowFileReader fileReader, VectorSchemaRoot schemaRoot, - Map perCityCount, - Map perCitySum) throws IOException { + Map perCityCount, + Map perCitySum) throws IOException { // Reading the data, one batch at a time while (fileReader.loadNextBatch()) { IntArrayList lastNameSelectedIndexes = filterOnLastName(schemaRoot); @@ -96,13 +95,13 @@ public class ProcessDataApplication { */ private void aggregate(VectorSchemaRoot schemaRoot, int[] selectedIndexes, - Map perCityCount, - Map perCitySum) { + Map perCityCount, + Map perCitySum) { VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city"); UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age"); for (int selectedIndex : selectedIndexes) { - String city = new String(cityVector.get(selectedIndex)); + ByteString city = ByteString.of(cityVector.get(selectedIndex)); perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1); perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex)); } @@ -112,8 +111,9 @@ public class ProcessDataApplication { private IntArrayList filterOnLastName(VectorSchemaRoot schemaRoot) { VarCharVector lastName = (VarCharVector) schemaRoot.getVector("lastName"); IntArrayList lastNameSelectedIndexes = new IntArrayList(); + byte[] prefixBytes = "P".getBytes(); for (int i = 0; i < schemaRoot.getRowCount(); i++) { - if (new String(lastName.get(i)).startsWith("P")) { + if (ByteString.of(lastName.get(i)).startsWith(prefixBytes)) { lastNameSelectedIndexes.add(i); } } @@ -137,19 +137,13 @@ public class ProcessDataApplication { // Keep street ending in 'way' private IntArrayList filterOnStreet(VectorSchemaRoot schemaRoot) { - Optional optionalStreet = schemaRoot.getVector("address") - .getChildrenFromFields() - .stream() - .filter(fieldVector -> "street".equals(fieldVector.getField().getName())) - .findFirst(); - - if (!optionalStreet.isPresent()) { - throw new IllegalArgumentException("Could not find street vector"); - } - VarCharVector street = (VarCharVector) optionalStreet.get(); + StructVector addressVector = (StructVector) schemaRoot.getVector("address"); + VarCharVector streetVector = (VarCharVector) addressVector.getChild("street"); + IntArrayList streetSelectedIndexes = new IntArrayList(); + byte[] suffixBytes = "way".getBytes(); for (int i = 0; i < schemaRoot.getRowCount(); i++) { - if (new String(street.get(i)).endsWith("way")) { + if (ByteString.of(streetVector.get(i)).endsWith(suffixBytes)) { streetSelectedIndexes.add(i); } } -- 2.45.2