~whereswaldon/dynselect

dea8e15d9bb3e968770dbb85482dc7969fd278cd — Chris Waldon 10 months ago
dynselect: initial implementation

This is a tested, seemingly-correct implementation of this construct, so I guess
it's worth publishing.

Signed-off-by: Chris Waldon <christopher.waldon.dev@gmail.com>
5 files changed, 265 insertions(+), 0 deletions(-)

A LICENSE
A README.md
A dynselect_test.go
A go.mod
A main.go
A  => LICENSE +63 -0
@@ 1,63 @@
This project is provided under the terms of the UNLICENSE or
the MIT license denoted by the following SPDX identifier:

SPDX-License-Identifier: Unlicense OR MIT

You may use the project under the terms of either license.

Both licenses are reproduced below.

----
The MIT License (MIT)

Copyright (c) 2023 Chris Waldon

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
---



---
The UNLICENSE

This is free and unencumbered software released into the public domain.

Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.

In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

For more information, please refer to <https://unlicense.org/>
---

A  => README.md +7 -0
@@ 1,7 @@
# dynselect

This package implements a highly-situational, reflection-based `select` statement. It allows you to construct a runtime-modifiable pool of channels that are monitored for close with a `select`. This is useful primarily when monitoring a pool of contexts for cancellation if you need to perform some additional cleanup after detecting the context cancellation. See the tests for some example usages.

If you don't understand why this is useful, stay away. You definitely don't need it, and it may yet prove to be a bad idea in practice.

Dual UNLICENSE/MIT

A  => dynselect_test.go +90 -0
@@ 1,90 @@
package dynselect

import (
	"math/rand"
	"testing"
)

func TestDynSelect(t *testing.T) {
	send := make(chan (<-chan struct{}))

	closed := DynSelect(send)

	chans := make([]chan struct{}, 100)
	chanToIdx := make(map[<-chan struct{}]int)
	for i := range chans {
		chans[i] = make(chan struct{})
		chanToIdx[chans[i]] = i
		send <- chans[i]
	}
	close(send)

	for i := range chans {
		close(chans[i])
	}
	for closedChannels := range closed {
		for _, nextClose := range closedChannels {
			t.Logf("detected close for %d\n", chanToIdx[nextClose])
			delete(chanToIdx, nextClose)
		}
	}

	if len(chanToIdx) > 0 {
		t.Errorf("expected all channels to close, but %d did not", len(chanToIdx))
	}
}

func TestDynSelectTorture(t *testing.T) {
	send := make(chan (<-chan struct{}))

	closed := DynSelect(send)

	chans := make([]chan struct{}, 100)
	chanToIdx := make(map[<-chan struct{}]int)
	unclosed := make(map[chan struct{}]struct{})
	sent := 0
	for i := range chans {
		chans[i] = make(chan struct{})
		chanToIdx[chans[i]] = i
		unclosed[chans[i]] = struct{}{}
	}

	sendChan := func(c chan struct{}) {
		send <- c
		sent++
	}

	closeChan := func() {
		var key chan struct{}
		for key = range unclosed {
			close(key)
			break
		}
		delete(unclosed, key)
	}

	for sent < len(chans) || len(unclosed) > 0 {
		if sent < len(chans) && len(unclosed) > 0 {
			if rand.Intn(2) > 0 {
				sendChan(chans[sent])
			} else {
				closeChan()
			}
		} else if sent < len(chans) {
			sendChan(chans[sent])
		} else {
			closeChan()
		}
	}
	close(send)
	for closedChannels := range closed {
		for _, nextClose := range closedChannels {
			t.Logf("detected close for %d\n", chanToIdx[nextClose])
			delete(chanToIdx, nextClose)
		}
	}

	if len(chanToIdx) > 0 {
		t.Errorf("expected all channels to close, but %d did not", len(chanToIdx))
	}
}

A  => go.mod +3 -0
@@ 1,3 @@
module git.sr.ht/~whereswaldon/dynselect

go 1.21.1

A  => main.go +102 -0
@@ 1,102 @@
package dynselect

import (
	"context"
	"reflect"
	"runtime/trace"
	"slices"
)

// Select implements the dynamic supervision of a pool of channels that are expected to eventually
// close. You add channels to the supervised pool by sending them on the provided add channel, and channels
// that close while supervised are emitted on the returned closed channel.
//
// This function spawns a new goroutine to perform the supervision. This goroutine will shut down when
// the provided add channel is closed and all supervised channels close. The returned closed channel
// will close when the goroutine exits.
func Select[T any](add <-chan <-chan T) (closed <-chan []<-chan T) {
	closedChan := make(chan ([]<-chan T))
	go func() {
		defer trace.Logf(context.Background(), "dynselect", "shut down")
		defer close(closedChan)
		canReturn := false
		supervised := make([]<-chan T, 0)
		var emit chan ([]<-chan T)
		var emitValue []<-chan T
		var cases []reflect.SelectCase
		for {
			reg := trace.StartRegion(context.Background(), "dynselect build")
			// Reuse the memory for cases. First clear it, then grow it if necessary, then
			// set the length to be what we need right now.
			for i := range cases {
				// Zero all elements first.
				cases[i] = reflect.SelectCase{}
			}
			targetSize := len(supervised) + 2
			if targetSize > len(cases) {
				cases = slices.Grow(cases, targetSize-len(cases))
			}
			cases = cases[:targetSize]

			recvNewChanIndex := len(supervised)
			cases[recvNewChanIndex] = reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(add),
			}
			sendClosedChanIndex := len(supervised) + 1
			cases[sendClosedChanIndex] = reflect.SelectCase{
				Dir:  reflect.SelectSend,
				Chan: reflect.ValueOf(emit),
				Send: reflect.ValueOf(emitValue),
			}
			for i, supervisedChan := range supervised {
				cases[i] = reflect.SelectCase{
					Dir:  reflect.SelectRecv,
					Chan: reflect.ValueOf(supervisedChan),
				}
			}
			trace.Logf(context.Background(), "dynselect", "built %d dynamic cases", len(supervised))
			reg.End()

			reg = trace.StartRegion(context.Background(), "dynselect select")
			chosen, val, open := reflect.Select(cases)
			reg.End()
			reg = trace.StartRegion(context.Background(), "dynselect process")
			if chosen < len(supervised) {
				if !open {
					// Send the supervised channel on the closed channel and remove it from further
					// processing.
					emitValue = append(emitValue, supervised[chosen])
					trace.Logf(context.Background(), "dynselect", "detected close of channel %p at index %d", supervised[chosen], chosen)
					emit = closedChan
					supervised[chosen] = nil
					supervised = slices.Delete(supervised, chosen, chosen+1)
				}
			} else if chosen == recvNewChanIndex {
				if open {
					// Add the new channel to the supervised group.
					asChan := val.Interface().(<-chan T)
					supervised = append(supervised, asChan)
					trace.Logf(context.Background(), "dynselect", "added chan %p at index %d", asChan, len(supervised))
				} else {
					// Our input channel closed, so we're done.
					canReturn = true
					// Nil out the input channel so that its select case never fires again, otherwise we'll spin in a busy
					// loop until shutdown.
					add = nil
				}
			} else if chosen == sendClosedChanIndex {
				// Disable the send case until a new channel closes.
				trace.Logf(context.Background(), "dynselect", "sent close notification for channels %v", emitValue)
				emit = nil
				emitValue = nil
			}
			reg.End()
			// return once our input closes and all supervised channels are closed and emitted.
			if canReturn && len(supervised) == 0 && len(emitValue) == 0 {
				return
			}
		}
	}()
	return closedChan
}