~badt/glavar

6a19d38e5b86c50505c328d0b100e77fd5f2db96 — Ian P Badtrousers 1 year, 10 months ago 5fdf233 fthinoporo
fthinoporo: multicore batching
1 files changed, 26 insertions(+), 25 deletions(-)

M fthinoporo/index.go
M fthinoporo/index.go => fthinoporo/index.go +26 -25
@@ 3,17 3,14 @@ package main
import (
	"database/sql"
	"fmt"
	"math/rand"
	"runtime"
	"strconv"
	"time"
	"sync"

	"github.com/blevesearch/bleve"
)

func index() {
	rand.Seed(time.Now().UnixNano())

	mapping := bleve.NewIndexMapping()
	idx, err := bleve.New("index", mapping)
	if err != nil {


@@ 27,35 24,32 @@ func index() {
	}
	defer db.Close()

	data, err := db.Query("SELECT * FROM logec ORDER BY time ASC")
	data, err := db.Query("SELECT * FROM logec ORDER BY time DESC")
	if err != nil {
		panic(err)
	}

	N, pipe := 0, make(chan M, 1000)
	const batchSize = 2500

	cpu := runtime.NumCPU()
	fmt.Println("cpu count:", cpu)

	var start time.Time
	batches := make(chan []M, cpu)
	wg := &sync.WaitGroup{}
	for i := 0; i < cpu; i++ {
		go func(i int) {
			n := 0
			for m := range pipe {
				idx.Index(strconv.FormatInt(m.T, 10), m)
				n++
				N++

				if rand.Intn(1000) == 7 {
					fmt.Printf("[%v] indexed:%d, total:%d\n",
						time.Now().Sub(start), n, N)
			wg.Add(1)
			defer wg.Done()
			for messages := range batches {
				b := idx.NewBatch()
				for _, m := range messages {
					b.Index(strconv.FormatInt(m.T, 10), m)
				}
				idx.Batch(b)
				fmt.Printf("[%d] batch %d\n", i, len(messages))
			}
		}(i)
	}

	fmt.Println("loading data")
	start = time.Now()
	A := make([]M, 0, batchSize)
	for data.Next() {
		var m M
		err = data.Scan(&m.T, &m.Login, &m.Text)


@@ 63,12 57,19 @@ func index() {
			fmt.Println("!!", err)
			return
		}
		pipe <- m
		if rand.Intn(1000) == 7 {
			t := time.Unix(0, m.T).Format(time.RFC822)
			fmt.Println(t)

		A = append(A, m)

		if len(A) == cap(A) {
			batches <- A
			A = make([]M, 0, batchSize)
		}
	}

	close(pipe)
	if len(A) != 0 {
		batches <- A
	}

	close(batches)
	wg.Wait()
}