@@ 3,17 3,14 @@ package main
import (
"database/sql"
"fmt"
- "math/rand"
"runtime"
"strconv"
- "time"
+ "sync"
"github.com/blevesearch/bleve"
)
func index() {
- rand.Seed(time.Now().UnixNano())
-
mapping := bleve.NewIndexMapping()
idx, err := bleve.New("index", mapping)
if err != nil {
@@ 27,35 24,32 @@ func index() {
}
defer db.Close()
- data, err := db.Query("SELECT * FROM logec ORDER BY time ASC")
+ data, err := db.Query("SELECT * FROM logec ORDER BY time DESC")
if err != nil {
panic(err)
}
- N, pipe := 0, make(chan M, 1000)
+ const batchSize = 2500
cpu := runtime.NumCPU()
- fmt.Println("cpu count:", cpu)
-
- var start time.Time
+ batches := make(chan []M, cpu)
+ wg := &sync.WaitGroup{}
for i := 0; i < cpu; i++ {
go func(i int) {
- n := 0
- for m := range pipe {
- idx.Index(strconv.FormatInt(m.T, 10), m)
- n++
- N++
-
- if rand.Intn(1000) == 7 {
- fmt.Printf("[%v] indexed:%d, total:%d\n",
- time.Now().Sub(start), n, N)
+ wg.Add(1)
+ defer wg.Done()
+ for messages := range batches {
+ b := idx.NewBatch()
+ for _, m := range messages {
+ b.Index(strconv.FormatInt(m.T, 10), m)
}
+ idx.Batch(b)
+ fmt.Printf("[%d] batch %d\n", i, len(messages))
}
}(i)
}
- fmt.Println("loading data")
- start = time.Now()
+ A := make([]M, 0, batchSize)
for data.Next() {
var m M
err = data.Scan(&m.T, &m.Login, &m.Text)
@@ 63,12 57,19 @@ func index() {
fmt.Println("!!", err)
return
}
- pipe <- m
- if rand.Intn(1000) == 7 {
- t := time.Unix(0, m.T).Format(time.RFC822)
- fmt.Println(t)
+
+ A = append(A, m)
+
+ if len(A) == cap(A) {
+ batches <- A
+ A = make([]M, 0, batchSize)
}
}
- close(pipe)
+ if len(A) != 0 {
+ batches <- A
+ }
+
+ close(batches)
+ wg.Wait()
}