~jgillis/flatbuffers-read-writing

939487f35b5a9ce7aae47ca7a83f9cc490abdc6d — Joris Gillis 4 years ago 2bb0af3
Processing data

Filtering on multiple columns & filtering on a single column
M pom.xml => pom.xml +10 -0
@@ 40,6 40,16 @@
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
        <dependency>
            <groupId>it.unimi.dsi</groupId>
            <artifactId>fastutil</artifactId>
            <version>8.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.okio</groupId>
            <artifactId>okio</artifactId>
            <version>1.15.0</version>
        </dependency>
    </dependencies>

    <build>

M src/main/java/com/example/flatbuffers/GeneratingAndWriting.java => src/main/java/com/example/flatbuffers/GeneratingAndWriting.java +9 -0
@@ 4,6 4,7 @@ import com.example.flatbuffers.model.Address;
import com.example.flatbuffers.model.People;
import com.example.flatbuffers.model.Person;
import com.google.flatbuffers.FlatBufferBuilder;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



@@ 26,6 27,14 @@ public class GeneratingAndWriting {
    private static final String PERSON_LINE_TEMPLATE = " * {}";
    private static final String ADDRESS_LINE_TEMPLATE = "   * {} {}";

    public static void main(String[] args) {
        StopWatch stopWatch = StopWatch.createStarted();
        int numberOfPeople = 10_047_031;
        new GeneratingAndWriting().writingPeople(numberOfPeople);
        stopWatch.stop();
        LOGGER.info("Generating and writing took: {}", stopWatch);
    }

    public void writingPeople(int numberOfPeople) {
        // Writing out data to file
        try (FileOutputStream fd = new FileOutputStream("people.flatbuffers")) {

A src/main/java/com/example/flatbuffers/ReadingAndProcessing.java => src/main/java/com/example/flatbuffers/ReadingAndProcessing.java +71 -0
@@ 0,0 1,71 @@
package com.example.flatbuffers;

import com.example.flatbuffers.model.People;
import com.example.flatbuffers.model.Person;
import okio.ByteString;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

/**
 * Small application to read data from people.flatbuffers file, and do some analytics on them:
 * - filter people having a last name starting with 'P'
 * - filter people between the age of 18 and 35
 * - filter people living in a street ending with 'way'
 * - group by city
 * - aggregate average age
 */
public class ReadingAndProcessing {

    private static final Logger LOGGER = LoggerFactory.getLogger(ReadingAndProcessing.class);
    private static final byte[] LAST_NAME_PREFIX_BYTES = "P".getBytes();
    private static final byte[] STREET_SUFFIX_BYTES = "way".getBytes();

    public static void main(String[] args) {
        new ReadingAndProcessing().readAndProcess("people.flatbuffers");
    }

    public void readAndProcess(String fileName) {
        StopWatch stopWatch = StopWatch.createStarted();
        try (FileInputStream fd = new FileInputStream(fileName)) {
            LOGGER.info("Reading data ...");
            ByteBuffer data = ByteBuffer.wrap(fd.readAllBytes());

            People people = People.getRootAsPeople(data);
            LOGGER.info("Number of people: {}", people.peopleLength());

            LOGGER.info("Processing ...");
            Map<ByteString, Long> perCityCount = new HashMap<>();
            Map<ByteString, Long> perCitySum = new HashMap<>();
            for (int i = 0; i < people.peopleLength(); i++) {
                Person person = people.people(i);
                if (isMatchingPerson(person)) {
                    ByteString city = ByteString.of(person.address().cityAsByteBuffer());
                    perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
                    perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + person.age());
                }
            }

            LOGGER.info("Average age ...");
            for (ByteString city : perCityCount.keySet()) {
                LOGGER.info("{}: {}", city, (float) perCitySum.getOrDefault(city, 0L) / perCityCount.get(city));
            }
        } catch (IOException e) {
            LOGGER.error("Failed to read data", e);
        }
        stopWatch.stop();
        LOGGER.info("Took: {}", stopWatch);
    }

    private boolean isMatchingPerson(Person person) {
        return ByteString.of(person.lastNameAsByteBuffer()).startsWith(LAST_NAME_PREFIX_BYTES)
                && 18 <= person.age() && person.age() <= 35
                && ByteString.of(person.address().streetAsByteBuffer()).endsWith(STREET_SUFFIX_BYTES);
    }
}

A src/main/java/com/example/flatbuffers/SingleColumnFilterApplication.java => src/main/java/com/example/flatbuffers/SingleColumnFilterApplication.java +66 -0
@@ 0,0 1,66 @@
package com.example.flatbuffers;

import com.example.flatbuffers.model.People;
import com.example.flatbuffers.model.Person;
import okio.ByteString;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

/**
 * Small application to read data from people.flatbuffers file, and do some analytics on them:
 * - filter people living in a street ending with 'way'
 * - group by city
 * - aggregate average age
 */
public class SingleColumnFilterApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(SingleColumnFilterApplication.class);
    private static final byte[] SUFFIX_BYTES = "way".getBytes();

    public static void main(String[] args) {
        new SingleColumnFilterApplication().readAndProcess("people.flatbuffers");
    }

    public void readAndProcess(String fileName) {
        StopWatch stopWatch = StopWatch.createStarted();
        try (FileInputStream fd = new FileInputStream(fileName)) {
            LOGGER.info("Reading data ...");
            ByteBuffer data = ByteBuffer.wrap(fd.readAllBytes());

            People people = People.getRootAsPeople(data);
            LOGGER.info("Number of people: {}", people.peopleLength());

            LOGGER.info("Processing ...");
            Map<ByteString, Long> perCityCount = new HashMap<>();
            Map<ByteString, Long> perCitySum = new HashMap<>();
            for (int i = 0; i < people.peopleLength(); i++) {
                Person person = people.people(i);
                if (isMatchingPerson(person)) {
                    ByteString city = ByteString.of(person.address().cityAsByteBuffer());
                    perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
                    perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + person.age());
                }
            }

            LOGGER.info("Average age ...");
            for (ByteString city : perCityCount.keySet()) {
                LOGGER.info("{}: {}", city, (float) perCitySum.getOrDefault(city, 0L) / perCityCount.get(city));
            }
        } catch (IOException e) {
            LOGGER.error("Failed to read data", e);
        }
        stopWatch.stop();
        LOGGER.info("Took: {}", stopWatch);
    }

    private boolean isMatchingPerson(Person person) {
        return ByteString.of(person.address().streetAsByteBuffer()).endsWith(SUFFIX_BYTES);
    }
}