Implement graph.MapReduce

This commit is contained in:
Brian Picciano 2021-12-30 11:16:08 -07:00
parent 2a96e9a593
commit 223b7f93a5
2 changed files with 285 additions and 0 deletions

View File

@ -283,3 +283,140 @@ outer:
return true return true
} }
func mapReduce[Ea, Va Value, Vb any](
root *OpenEdge[Ea, Va],
mapVal func(Va) (Vb, error),
reduceEdge func(*OpenEdge[Ea, Va], []Vb) (Vb, error),
) (
Vb, error,
){
if valA, ok := root.FromValue(); ok {
valB, err := mapVal(valA)
if err != nil {
var zero Vb
return zero, err
}
return reduceEdge(root, []Vb{valB})
}
tupA, _ := root.FromTuple()
valsB := make([]Vb, len(tupA))
for i := range tupA {
valB, err := mapReduce[Ea, Va, Vb](
tupA[i], mapVal, reduceEdge,
)
if err != nil {
var zero Vb
return zero, err
}
valsB[i] = valB
}
return reduceEdge(root, valsB)
}
type mappedVal[Va Value, Vb any] struct {
valA Va
valB Vb // result
}
type reducedEdge[Ea, Va Value, Vb any] struct {
edgeA *OpenEdge[Ea, Va]
valB Vb // result
}
// MapReduce recursively computes a resultant Value of type Vb from an
// OpenEdge[Ea, Va].
//
// Tuple edges which are encountered will have Reduce called on each OpenEdge
// branch of the tuple, to obtain a Vb for each branch. The edge value of the
// tuple edge (Ea) and the just obtained Vbs are then passed to reduceEdge to
// obtain a Vb for that edge.
//
// The values of value edges (Va) which are encountered are mapped to Vb using
// the mapVal function. The edge value of those value edges (Ea) and the just
// obtained Vb value are then passed to reduceEdge to obtain a Vb for that edge.
//
// If either the map or reduce function returns an error then processing is
// immediately cancelled and that error is returned directly.
//
// If a value or edge is connected to multiple times within the root OpenEdge it
// will only be mapped/reduced a single time, and the result of that single
// map/reduction will be passed to each dependant operation.
//
func MapReduce[Ea, Va Value, Vb any](
root *OpenEdge[Ea, Va],
mapVal func(Va) (Vb, error),
reduceEdge func(Ea, []Vb) (Vb, error),
) (
Vb, error,
){
var (
zeroB Vb
// we use these to memoize reductions on values and edges, so a
// reduction is only performed a single time for each value/edge.
//
// NOTE this is not implemented very efficiently.
mappedVals []mappedVal[Va, Vb]
reducedEdges []reducedEdge[Ea, Va, Vb]
)
return mapReduce[Ea, Va, Vb](
root,
func(valA Va) (Vb, error) {
for _, mappedVal := range mappedVals {
if mappedVal.valA.Equal(valA) {
return mappedVal.valB, nil
}
}
valB, err := mapVal(valA)
if err != nil {
return zeroB, err
}
mappedVals = append(mappedVals, mappedVal[Va, Vb]{
valA: valA,
valB: valB,
})
return valB, nil
},
func(edgeA *OpenEdge[Ea, Va], valBs []Vb) (Vb, error) {
for _, reducedEdge := range reducedEdges {
if reducedEdge.edgeA.equal(edgeA) {
return reducedEdge.valB, nil
}
}
valB, err := reduceEdge(edgeA.EdgeValue(), valBs)
if err != nil {
return zeroB, err
}
reducedEdges = append(reducedEdges, reducedEdge[Ea, Va, Vb]{
edgeA: edgeA,
valB: valB,
})
return valB, nil
},
)
}

View File

@ -1,6 +1,8 @@
package graph package graph
import ( import (
"fmt"
"errors"
"strconv" "strconv"
"testing" "testing"
@ -13,6 +15,12 @@ func (s S) Equal(s2 Value) bool { return s == s2.(S) }
func (s S) String() string { return string(s) } func (s S) String() string { return string(s) }
type I int
func (i I) Equal(i2 Value) bool { return i == i2.(I) }
func (i I) String() string { return strconv.Itoa(int(i)) }
func TestEqual(t *testing.T) { func TestEqual(t *testing.T) {
var ( var (
@ -118,3 +126,143 @@ func TestEqual(t *testing.T) {
}) })
} }
} }
type mapReduceTestEdge struct {
name string
fn func([]int) int
done bool
}
func (e *mapReduceTestEdge) Equal(e2i Value) bool {
e2, _ := e2i.(*mapReduceTestEdge)
if e == nil || e2 == nil {
return e == e2
}
return e.name == e2.name
}
func (e *mapReduceTestEdge) String() string {
return e.name
}
func (e *mapReduceTestEdge) do(ii []int) int {
if e.done {
panic(fmt.Sprintf("%q already done", e.name))
}
e.done = true
return e.fn(ii)
}
func TestMapReduce(t *testing.T) {
type (
Va = I
Vb = int
Ea = *mapReduceTestEdge
edge = OpenEdge[Ea, Va]
)
var (
zeroVb Vb
)
vOut := func(edge Ea, val Va) *edge {
return ValueOut[Ea, Va](edge, val)
}
tOut := func(edge Ea, ins ...*edge) *edge {
return TupleOut[Ea, Va](edge, ins...)
}
add := func() *mapReduceTestEdge{
return &mapReduceTestEdge{
name: "add",
fn: func(ii []int) int {
var n int
for _, i := range ii {
n += i
}
return n
},
}
}
mul := func() *mapReduceTestEdge{
return &mapReduceTestEdge{
name: "mul",
fn: func(ii []int) int {
n := 1
for _, i := range ii {
n *= i
}
return n
},
}
}
mapVal := func(valA Va) (Vb, error) {
return Vb(valA * 10), nil
}
reduceEdge := func(edgeA Ea, valBs []Vb) (Vb, error) {
if edgeA == nil {
if len(valBs) == 1 {
return valBs[0], nil
}
return zeroVb, errors.New("tuple edge must have edge value")
}
return edgeA.do(valBs), nil
}
tests := []struct {
in *edge
exp int
}{
{
in: vOut(nil, 1),
exp: 10,
},
{
in: vOut(add(), 1),
exp: 10,
},
{
in: tOut(
add(),
vOut(nil, 1),
vOut(add(), 2),
vOut(mul(), 3),
),
exp: 60,
},
{
// duplicate edges and values getting used twice, each should only
// get eval'd once
in: tOut(
add(),
tOut(add(), vOut(nil, 1), vOut(nil, 2)),
tOut(add(), vOut(nil, 1), vOut(nil, 2)),
tOut(add(), vOut(nil, 3), vOut(nil, 3)),
),
exp: 120,
},
}
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
got, err := MapReduce(test.in, mapVal, reduceEdge)
assert.NoError(t, err)
assert.Equal(t, test.exp, got)
})
}
}