From 223b7f93a5f5dfba160cf84e8621ea17322f9405 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Thu, 30 Dec 2021 11:16:08 -0700 Subject: [PATCH] Implement graph.MapReduce --- graph/graph.go | 137 ++++++++++++++++++++++++++++++++++++++++ graph/graph_test.go | 148 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) diff --git a/graph/graph.go b/graph/graph.go index df4eba6..92202d6 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -283,3 +283,140 @@ outer: return true } + + +func mapReduce[Ea, Va Value, Vb any]( + root *OpenEdge[Ea, Va], + mapVal func(Va) (Vb, error), + reduceEdge func(*OpenEdge[Ea, Va], []Vb) (Vb, error), +) ( + Vb, error, +){ + + if valA, ok := root.FromValue(); ok { + + valB, err := mapVal(valA) + + if err != nil { + var zero Vb + return zero, err + } + + return reduceEdge(root, []Vb{valB}) + } + + tupA, _ := root.FromTuple() + + valsB := make([]Vb, len(tupA)) + + for i := range tupA { + + valB, err := mapReduce[Ea, Va, Vb]( + tupA[i], mapVal, reduceEdge, + ) + + if err != nil { + var zero Vb + return zero, err + } + + valsB[i] = valB + } + + return reduceEdge(root, valsB) +} + +type mappedVal[Va Value, Vb any] struct { + valA Va + valB Vb // result +} + +type reducedEdge[Ea, Va Value, Vb any] struct { + edgeA *OpenEdge[Ea, Va] + valB Vb // result +} + +// MapReduce recursively computes a resultant Value of type Vb from an +// OpenEdge[Ea, Va]. +// +// Tuple edges which are encountered will have Reduce called on each OpenEdge +// branch of the tuple, to obtain a Vb for each branch. The edge value of the +// tuple edge (Ea) and the just obtained Vbs are then passed to reduceEdge to +// obtain a Vb for that edge. +// +// The values of value edges (Va) which are encountered are mapped to Vb using +// the mapVal function. The edge value of those value edges (Ea) and the just +// obtained Vb value are then passed to reduceEdge to obtain a Vb for that edge. +// +// If either the map or reduce function returns an error then processing is +// immediately cancelled and that error is returned directly. +// +// If a value or edge is connected to multiple times within the root OpenEdge it +// will only be mapped/reduced a single time, and the result of that single +// map/reduction will be passed to each dependant operation. +// +func MapReduce[Ea, Va Value, Vb any]( + root *OpenEdge[Ea, Va], + mapVal func(Va) (Vb, error), + reduceEdge func(Ea, []Vb) (Vb, error), +) ( + Vb, error, +){ + + var ( + zeroB Vb + + // we use these to memoize reductions on values and edges, so a + // reduction is only performed a single time for each value/edge. + // + // NOTE this is not implemented very efficiently. + mappedVals []mappedVal[Va, Vb] + reducedEdges []reducedEdge[Ea, Va, Vb] + ) + + return mapReduce[Ea, Va, Vb]( + root, + func(valA Va) (Vb, error) { + + for _, mappedVal := range mappedVals { + if mappedVal.valA.Equal(valA) { + return mappedVal.valB, nil + } + } + + valB, err := mapVal(valA) + + if err != nil { + return zeroB, err + } + + mappedVals = append(mappedVals, mappedVal[Va, Vb]{ + valA: valA, + valB: valB, + }) + + return valB, nil + }, + func(edgeA *OpenEdge[Ea, Va], valBs []Vb) (Vb, error) { + + for _, reducedEdge := range reducedEdges { + if reducedEdge.edgeA.equal(edgeA) { + return reducedEdge.valB, nil + } + } + + valB, err := reduceEdge(edgeA.EdgeValue(), valBs) + + if err != nil { + return zeroB, err + } + + reducedEdges = append(reducedEdges, reducedEdge[Ea, Va, Vb]{ + edgeA: edgeA, + valB: valB, + }) + + return valB, nil + }, + ) +} diff --git a/graph/graph_test.go b/graph/graph_test.go index 816503d..33e682b 100644 --- a/graph/graph_test.go +++ b/graph/graph_test.go @@ -1,6 +1,8 @@ package graph import ( + "fmt" + "errors" "strconv" "testing" @@ -13,6 +15,12 @@ func (s S) Equal(s2 Value) bool { return s == s2.(S) } func (s S) String() string { return string(s) } +type I int + +func (i I) Equal(i2 Value) bool { return i == i2.(I) } + +func (i I) String() string { return strconv.Itoa(int(i)) } + func TestEqual(t *testing.T) { var ( @@ -118,3 +126,143 @@ func TestEqual(t *testing.T) { }) } } + +type mapReduceTestEdge struct { + name string + fn func([]int) int + done bool +} + +func (e *mapReduceTestEdge) Equal(e2i Value) bool { + + e2, _ := e2i.(*mapReduceTestEdge) + + if e == nil || e2 == nil { + return e == e2 + } + + return e.name == e2.name +} + +func (e *mapReduceTestEdge) String() string { + return e.name +} + +func (e *mapReduceTestEdge) do(ii []int) int { + + if e.done { + panic(fmt.Sprintf("%q already done", e.name)) + } + + e.done = true + + return e.fn(ii) +} + +func TestMapReduce(t *testing.T) { + + type ( + Va = I + Vb = int + Ea = *mapReduceTestEdge + edge = OpenEdge[Ea, Va] + ) + + var ( + zeroVb Vb + ) + + vOut := func(edge Ea, val Va) *edge { + return ValueOut[Ea, Va](edge, val) + } + + tOut := func(edge Ea, ins ...*edge) *edge { + return TupleOut[Ea, Va](edge, ins...) + } + + add := func() *mapReduceTestEdge{ + return &mapReduceTestEdge{ + name: "add", + fn: func(ii []int) int { + var n int + for _, i := range ii { + n += i + } + return n + }, + } + } + + mul := func() *mapReduceTestEdge{ + return &mapReduceTestEdge{ + name: "mul", + fn: func(ii []int) int { + n := 1 + for _, i := range ii { + n *= i + } + return n + }, + } + } + + mapVal := func(valA Va) (Vb, error) { + return Vb(valA * 10), nil + } + + reduceEdge := func(edgeA Ea, valBs []Vb) (Vb, error) { + + if edgeA == nil { + + if len(valBs) == 1 { + return valBs[0], nil + } + + return zeroVb, errors.New("tuple edge must have edge value") + } + + return edgeA.do(valBs), nil + } + + tests := []struct { + in *edge + exp int + }{ + { + in: vOut(nil, 1), + exp: 10, + }, + { + in: vOut(add(), 1), + exp: 10, + }, + { + in: tOut( + add(), + vOut(nil, 1), + vOut(add(), 2), + vOut(mul(), 3), + ), + exp: 60, + }, + { + // duplicate edges and values getting used twice, each should only + // get eval'd once + in: tOut( + add(), + tOut(add(), vOut(nil, 1), vOut(nil, 2)), + tOut(add(), vOut(nil, 1), vOut(nil, 2)), + tOut(add(), vOut(nil, 3), vOut(nil, 3)), + ), + exp: 120, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + got, err := MapReduce(test.in, mapVal, reduceEdge) + assert.NoError(t, err) + assert.Equal(t, test.exp, got) + }) + } +}