~gioverse/chat

ref: c86e075f0df33cc01ac99f167a09f981997727d0 chat/list/async.go -rw-r--r-- 4.0 KiB
c86e075fChris Waldon debug: add function to easily log structures 10 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package list

import (
	"fmt"
)

// stateUpdate contains a new slice of element data and a mapping from all of
// the element serials to their respective indicies. This data structure is designed
// to allow the UI code to quickly find and update any offsets and locations
// within the new data.
type stateUpdate struct {
	Synthesis
	// CompactedSerials is a slice of Serials that were compacted within this update.
	CompactedSerials []Serial
	// PreserveListEnd indicates whether or not the list.Position.BeforeEnd field
	// should be reset when applying this state update.
	PreserveListEnd bool
	// Ignore reports which directions (if any) the async backend currently
	// believes to have no new content.
	Ignore Direction
}

func (s stateUpdate) String() string {
	return fmt.Sprintf("Synthesis: %v, Compacted: %v, Preserve: %v", s.Synthesis, s.CompactedSerials, s.PreserveListEnd)
}

// viewport represents a range of elements visible within a list.
type viewport struct {
	Start, End Serial
}

// asyncProcess runs a list.processor concurrently.
// New elements are processed and compacted according to maxSize
// on each loadRequest. Close the loadRequest channel to terminate
// processing.
func asyncProcess(maxSize int, hooks Hooks) (chan<- interface{}, chan viewport, <-chan stateUpdate) {
	compact := NewCompact(maxSize, hooks.Comparator)
	var synthesis Synthesis
	reqChan := make(chan interface{})
	updateChan := make(chan stateUpdate, 1)
	viewports := make(chan viewport, 1)
	go func() {
		defer close(updateChan)
		var (
			viewport viewport
			ignore   Direction
		)
		for {
			var (
				su         stateUpdate
				newElems   []Element
				updateOnly []Element
				rmSerials  []Serial
			)
			select {
			case req, more := <-reqChan:
				if !more {
					return
				}
				switch req := req.(type) {
				case modificationRequest:
					newElems = req.NewOrUpdate
					rmSerials = req.Remove
					updateOnly = req.UpdateOnly
					su.PreserveListEnd = true

					/*
						Remove any elements that sort outside the boundaries of the
						current list.
					*/
					SliceFilter(&newElems, func(elem Element) bool {
						if len(synthesis.Source) == 0 {
							return true
						}
						sortsBefore := compact.Comparator(elem, synthesis.Source[0])
						sortsAfter := compact.Comparator(synthesis.Source[len(synthesis.Source)-1], elem)
						// If this element sorts before the beginning of the list or after
						// the end of the list, it should not be inserted unless we are at
						// the appropriate end of the list.
						switch {
						case sortsBefore && ignore == Before:
							return true
						case sortsAfter && ignore == After:
							return true
						case sortsBefore || sortsAfter:
							return false
						default:
							return true
						}
					})
					ignore = noDirection
				case loadRequest:
					viewport = req.viewport
					if ignore.Contains(req.Direction) {
						continue
					}

					// Find the serial of the element at either end of the list.
					var loadSerial Serial
					switch req.Direction {
					case Before:
						loadSerial = synthesis.SerialAt(0)
					case After:
						loadSerial = synthesis.SerialAt(len(synthesis.Source) - 1)
					}
					// Load new elements.
					newElems = append(newElems, hooks.Loader(req.Direction, loadSerial)...)
					// Track whether all new elements in a given direction have been
					// exhausted.
					if len(newElems) == 0 {
						ignore.Add(req.Direction)
					} else {
						ignore = noDirection
					}
				}
			}
			// Apply state updates.
			compact.Apply(newElems, updateOnly, rmSerials)

			// Update the viewport if there is a new one available.
			select {
			case viewport = <-viewports:
			default:
			}

			// Fetch new contents and list of compacted content.
			contents, compacted := compact.Compact(viewport.Start, viewport.End)
			su.CompactedSerials = compacted
			// Synthesize elements based on new contents.
			synthesis = Synthesize(contents, hooks.Synthesizer)
			su.Synthesis = synthesis
			su.Ignore = ignore

			updateChan <- su
			hooks.Invalidator()
		}
	}()
	return reqChan, viewports, updateChan
}