~gioverse/chat

ref: ff42a2f8b59287707042461aa1ca1347fb8250eb chat/list/async.go -rw-r--r-- 4.3 KiB
ff42a2f8Chris Waldon list: update Loader to return if more elements 8 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package list

import (
	"fmt"
)

type updateType uint8

const (
	// pull indicates the results from a load request. The update pulled
	// data from the data store.
	pull updateType = iota
	// push indicates the results from an asynchronous insertion of
	// data. The application pushed data to the list.
	push
)

func (u updateType) String() string {
	switch u {
	case pull:
		return "pull"
	case push:
		return "push"
	default:
		return "unknown"
	}
}

// stateUpdate contains a new slice of element data and a mapping from all of
// the element serials to their respective indicies. This data structure is designed
// to allow the UI code to quickly find and update any offsets and locations
// within the new data.
type stateUpdate struct {
	Synthesis
	// CompactedSerials is a slice of Serials that were compacted within this update.
	CompactedSerials []Serial
	// Ignore reports which directions (if any) the async backend currently
	// believes to have no new content.
	Ignore Direction
	Type   updateType
}

func (s stateUpdate) String() string {
	return fmt.Sprintf("Synthesis: %v, Compacted: %v, Ignore: %v", s.Synthesis, s.CompactedSerials, s.Ignore)
}

// viewport represents a range of elements visible within a list.
type viewport struct {
	Start, End Serial
}

// asyncProcess runs a list.processor concurrently.
// New elements are processed and compacted according to maxSize
// on each loadRequest. Close the loadRequest channel to terminate
// processing.
func asyncProcess(maxSize int, hooks Hooks) (chan<- interface{}, chan viewport, <-chan stateUpdate) {
	compact := NewCompact(maxSize, hooks.Comparator)
	var synthesis Synthesis
	reqChan := make(chan interface{})
	updateChan := make(chan stateUpdate, 1)
	viewports := make(chan viewport, 1)
	go func() {
		defer close(updateChan)
		var (
			viewport viewport
			ignore   Direction
		)
		for {
			var (
				su         stateUpdate
				newElems   []Element
				updateOnly []Element
				rmSerials  []Serial
			)
			select {
			case req, more := <-reqChan:
				if !more {
					return
				}
				switch req := req.(type) {
				case modificationRequest:
					su.Type = push
					newElems = req.NewOrUpdate
					rmSerials = req.Remove
					updateOnly = req.UpdateOnly

					/*
						Remove any elements that sort outside the boundaries of the
						current list.
					*/
					SliceFilter(&newElems, func(elem Element) bool {
						if len(synthesis.Source) == 0 {
							return true
						}
						sortsBefore := compact.Comparator(elem, synthesis.Source[0])
						sortsAfter := compact.Comparator(synthesis.Source[len(synthesis.Source)-1], elem)
						// If this element sorts before the beginning of the list or after
						// the end of the list, it should not be inserted unless we are at
						// the appropriate end of the list.
						switch {
						case sortsBefore && ignore == Before:
							return true
						case sortsAfter && ignore == After:
							return true
						case sortsBefore || sortsAfter:
							return false
						default:
							return true
						}
					})
					ignore = NoDirection
				case loadRequest:
					su.Type = pull
					viewport = req.viewport
					if ignore.Contains(req.Direction) {
						continue
					}

					// Find the serial of the element at either end of the list.
					var loadSerial Serial
					switch req.Direction {
					case Before:
						loadSerial = synthesis.SerialAt(0)
					case After:
						loadSerial = synthesis.SerialAt(len(synthesis.Source) - 1)
					}
					// Load new elements.
					var more bool
					newElems, more = hooks.Loader(req.Direction, loadSerial)
					// Track whether all new elements in a given direction have been
					// exhausted.
					if len(newElems) == 0 || !more {
						ignore.Add(req.Direction)
					} else {
						ignore = NoDirection
					}
				}
			}
			// Apply state updates.
			compact.Apply(newElems, updateOnly, rmSerials)

			// Update the viewport if there is a new one available.
			select {
			case viewport = <-viewports:
			default:
			}

			// Fetch new contents and list of compacted content.
			contents, compacted := compact.Compact(viewport.Start, viewport.End)
			su.CompactedSerials = compacted
			// Synthesize elements based on new contents.
			synthesis = Synthesize(contents, hooks.Synthesizer)
			su.Synthesis = synthesis
			su.Ignore = ignore

			updateChan <- su
			hooks.Invalidator()
		}
	}()
	return reqChan, viewports, updateChan
}