Implement garage layout manager

This commit is contained in:
Brian Picciano 2025-01-07 15:39:26 +01:00
parent e9f318f34c
commit 24ac619798
4 changed files with 571 additions and 0 deletions

View File

@ -0,0 +1,168 @@
// Package glm, "garage layout manager", implements the transition of garage
// layout states based on the currently active layout and the desired layout
// defined by the user.
package glm
import (
"context"
"errors"
"fmt"
"io/fs"
"isle/daemon/daecommon"
"isle/garage"
"isle/jsonutil"
"isle/toolkit"
"net/netip"
"path/filepath"
)
// GarageLayoutManager (GLM) tracks the currently active set of storage
// allocations and calculates actions required to transition from the active set
// into a target set.
//
// GLM will make sure that when allocations are removed they are properly
// drained prior to being fully removed from the cluster.
type GarageLayoutManager interface {
// GetActiveAllocations returns the currently active set of allocations, as
// recorded by the last call to CommitStateTransition. Returns an empty set
// if CommitStateTransition has never been called.
GetActiveAllocations(context.Context) (
[]daecommon.ConfigStorageAllocation, error,
)
// SetActiveAllocations overwrites the stored active allocations, if any, to
// the given one.
SetActiveAllocations(context.Context, []daecommon.ConfigStorageAllocation) error
// Validate checks the target allocation set for any inconsistencies with
// the currently active one, and returns an error if one is found.
Validate(
_ context.Context, targetAllocs []daecommon.ConfigStorageAllocation,
) error
// CalculateStateTransition accepts a set of known nodes from an up-to-date
// [garage.ClusterStatus] and the target allocation set for the host, and
// returns a StateTransition describing the actions which should be taken.
//
// If the host is not running any garage instances, and therefore cannot
// determine the known nodes, nil should be passed instead.
CalculateStateTransition(
_ context.Context,
knownNodes []garage.KnownNode,
targetAllocs []daecommon.ConfigStorageAllocation,
) (
StateTransition, error,
)
// CommitStateTransition should be called after the CalculateStateTransition
// returns a StateTransition, and the StateTransition's prescribed actions
// have been successfully carried out.
CommitStateTransition(context.Context, StateTransition) error
}
type garageLayoutManager struct {
dir toolkit.Dir
hostIP netip.Addr
}
// NewGarageLayoutManager initializes and returns a GarageLayoutManager which
// will use the given directory to store state, and which is managing the layout
// for the host with the given IP.
func NewGarageLayoutManager(
dir toolkit.Dir, hostIP netip.Addr,
) GarageLayoutManager {
return &garageLayoutManager{dir, hostIP}
}
const glmStateFile = "glm.json"
type glmState struct {
ActiveAllocations []daecommon.ConfigStorageAllocation
}
func (glm *garageLayoutManager) get() (glmState, error) {
var (
path = filepath.Join(glm.dir.Path, glmStateFile)
state glmState
)
err := jsonutil.LoadFile(&state, path)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return glmState{}, err
}
return state, nil
}
func (glm *garageLayoutManager) set(state glmState) error {
path := filepath.Join(glm.dir.Path, glmStateFile)
return jsonutil.WriteFile(state, path, 0600)
}
func (glm *garageLayoutManager) GetActiveAllocations(context.Context) (
[]daecommon.ConfigStorageAllocation, error,
) {
state, err := glm.get()
return state.ActiveAllocations, err
}
func (glm *garageLayoutManager) SetActiveAllocations(
_ context.Context,
allocs []daecommon.ConfigStorageAllocation,
) error {
return glm.set(glmState{allocs})
}
func (glm *garageLayoutManager) Validate(
_ context.Context, targetAllocs []daecommon.ConfigStorageAllocation,
) error {
state, err := glm.get()
if err != nil {
return fmt.Errorf("reading state: %w", err)
}
return validateTargetAllocs(
state.ActiveAllocations,
targetAllocs,
)
}
func (glm *garageLayoutManager) CalculateStateTransition(
_ context.Context,
knownNodes []garage.KnownNode,
targetAllocs []daecommon.ConfigStorageAllocation,
) (
StateTransition, error,
) {
state, err := glm.get()
if err != nil {
return StateTransition{}, fmt.Errorf("reading state: %w", err)
}
err = validateTargetAllocs(state.ActiveAllocations, targetAllocs)
if err != nil {
return StateTransition{}, fmt.Errorf(
"validating target allocations: %w", err,
)
}
allKnownNodes, knownNodes := knownNodes, knownNodes[:0]
for _, node := range allKnownNodes {
if node.Addr.Addr() == glm.hostIP {
knownNodes = append(knownNodes, node)
}
}
return calcStateTransition(
state.ActiveAllocations, knownNodes, targetAllocs,
), nil
}
func (glm *garageLayoutManager) CommitStateTransition(
_ context.Context, stateTransition StateTransition,
) error {
return glm.set(glmState{
ActiveAllocations: stateTransition.ActiveAllocations(),
})
}

View File

@ -0,0 +1,144 @@
package glm
import (
"isle/daemon/daecommon"
"isle/garage"
)
func allocsByRPCPort(
allocs []daecommon.ConfigStorageAllocation,
) map[int]daecommon.ConfigStorageAllocation {
m := map[int]daecommon.ConfigStorageAllocation{}
for _, alloc := range allocs {
m[alloc.RPCPort] = alloc
}
return m
}
// AllocationWithKnownNode pairs an active storage allocation with its
// corresponding status information from garage.
type AllocationWithKnownNode struct {
garage.KnownNode
daecommon.ConfigStorageAllocation
}
func allocsWithKnownNode(
allocs []daecommon.ConfigStorageAllocation,
knownNodes []garage.KnownNode,
) (
res []AllocationWithKnownNode,
) {
outer:
for _, alloc := range allocs {
for _, knownNode := range knownNodes {
if alloc.RPCPort == int(knownNode.Addr.Port()) {
res = append(res, AllocationWithKnownNode{knownNode, alloc})
continue outer
}
}
res = append(res, AllocationWithKnownNode{ConfigStorageAllocation: alloc})
}
return
}
// StateTransition describes the allocation changes which should be made as a
// result of the current state of the cluster and the target allocation state.
type StateTransition struct {
// AddModifyAllocations should be added to the garage cluster layout, if
// they are not already.
AddModifyAllocations []daecommon.ConfigStorageAllocation
// DrainAllocations should be "removed" from the garage cluster layout, if
// the are not already. In reality "removing" from the garage cluster
// layout only removes the node's role, and puts it into the draining state.
// The node itself remains active.
DrainAllocations []AllocationWithKnownNode
}
// DrainAllocationIDs returns the IDs of all allocations in the DrainAllocations
// field.
func (st StateTransition) DrainAllocationIDs() []string {
ids := make([]string, len(st.DrainAllocations))
for i, alloc := range st.DrainAllocations {
ids[i] = alloc.ID
}
return ids
}
// ActiveAllocations returns all allocations which should be active, ie should
// have corresponding garage instances.
func (st StateTransition) ActiveAllocations() []daecommon.ConfigStorageAllocation {
allocs := make(
[]daecommon.ConfigStorageAllocation,
0,
len(st.AddModifyAllocations)+len(st.DrainAllocations),
)
for _, alloc := range st.AddModifyAllocations {
allocs = append(allocs, alloc)
}
for _, alloc := range st.DrainAllocations {
allocs = append(allocs, alloc.ConfigStorageAllocation)
}
return allocs
}
// calcStateTransition calculates the StateTransition which should be made given
// the current state of the garage layout.
//
// knownNodes is the set of KnownNode values returned from a
// [garage.ClusterStatus], filtered to only include those pertinent to this
// host.
//
// It is assumed that the active/targetAllocs pair has already been
// validated using validateTargetAllocs.
func calcStateTransition(
activeAllocs []daecommon.ConfigStorageAllocation,
knownNodes []garage.KnownNode,
targetAllocs []daecommon.ConfigStorageAllocation,
) (
res StateTransition,
) {
var (
activeAllocsWithKnownNodes = allocsWithKnownNode(
activeAllocs, knownNodes,
)
targetAllocsM = allocsByRPCPort(targetAllocs)
)
// Deal with additions/modifications to the cluster. These are easy, since
// all target allocations belong in the cluster no matter what.
for _, targetAlloc := range targetAllocs {
res.AddModifyAllocations = append(res.AddModifyAllocations, targetAlloc)
}
// Deal with removals from the cluster. These are harder.
for _, activeAlloc := range activeAllocsWithKnownNodes {
// If there's a corresponding targetAlloc then this alloc was added to
// AddModifyAllocations already. Even if it was previously draining,
// once it's added/modified it won't be anymore.
if _, ok := targetAllocsM[activeAlloc.RPCPort]; ok {
continue
}
// If it's draining then let it continue draining.
//
// If there's an associated role then this alloc is still part of the
// cluster, but it's not in the target set so it needs to be drained
// and removed.
if activeAlloc.Draining || activeAlloc.Role != nil {
res.DrainAllocations = append(res.DrainAllocations, activeAlloc)
continue
}
// The alloc is not in the target alloc set, it's not draining, and it
// has no role in the cluster.
}
return
}

View File

@ -0,0 +1,69 @@
package glm
import (
"errors"
"fmt"
"io/fs"
"isle/daemon/daecommon"
"os"
)
// targetAllocs is expected to have already been validated to be internally
// consistent, ie on conflicts between ports or directories within itself.
func validateTargetAllocs(
activeAllocs, targetAllocs []daecommon.ConfigStorageAllocation,
) error {
for _, targetAlloc := range targetAllocs {
var alreadyActive bool
for _, activeAlloc := range activeAllocs {
switch {
case targetAlloc.RPCPort == activeAlloc.RPCPort &&
targetAlloc.DataPath == activeAlloc.DataPath &&
targetAlloc.MetaPath == activeAlloc.MetaPath:
alreadyActive = true
break
// Changing the value of all fields besides these is supported.
case targetAlloc.RPCPort == activeAlloc.RPCPort:
return fmt.Errorf(
"RPC port %d is already being used", activeAlloc.RPCPort,
)
case targetAlloc.DataPath == activeAlloc.DataPath:
return fmt.Errorf(
"Data path %q is already being used", activeAlloc.DataPath,
)
case targetAlloc.MetaPath == activeAlloc.MetaPath:
return fmt.Errorf(
"Meta path %q is already being used", activeAlloc.MetaPath,
)
}
}
if !alreadyActive {
for _, dir := range []string{
targetAlloc.DataPath, targetAlloc.MetaPath,
} {
entries, err := os.ReadDir(dir)
if errors.Is(err, fs.ErrNotExist) {
continue
} else if err != nil {
return fmt.Errorf("reading contents of %q: %w", dir, err)
}
if len(entries) > 0 {
return fmt.Errorf(
"directory %q is already being used, please provide the path to an empty directory",
dir,
)
}
}
}
// targetAlloc validated
}
return nil
}

View File

@ -0,0 +1,190 @@
package glm
import (
"isle/daemon/daecommon"
"os"
"path/filepath"
"regexp"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_validateTargetAllocs(t *testing.T) {
t.Parallel()
tests := []struct {
label string
mkDir, mkFile string
activeAllocs, targetAllocs []daecommon.ConfigStorageAllocation
wantErr string
}{
{
label: "empty/empty",
},
{
label: "empty/single",
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
},
{
label: "empty/multiple",
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
},
},
{
label: "no change",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
},
},
{
label: "change capacity and add alloc",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 2, RPCPort: 3900},
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
},
},
{
label: "change rpc port",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3902},
{DataPath: "A1", MetaPath: "A2", Capacity: 2, RPCPort: 3901},
},
wantErr: `Data path ".+/A1" is already being used`,
},
{
label: "change data path",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
{DataPath: "Aa", MetaPath: "A2", Capacity: 2, RPCPort: 3900},
},
wantErr: `RPC port 3900 is already being used`,
},
{
label: "change meta path",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
{DataPath: "A1", MetaPath: "Aa", Capacity: 2, RPCPort: 3900},
},
wantErr: `RPC port 3900 is already being used`,
},
{
label: "conflict rpc port",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3900},
},
wantErr: `RPC port 3900 is already being used`,
},
{
label: "conflict data path",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "B2", Capacity: 1, RPCPort: 3901},
},
wantErr: `Data path ".+/A1" is already being used`,
},
{
label: "conflict meta path",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "B1", MetaPath: "A2", Capacity: 1, RPCPort: 3901},
},
wantErr: `Meta path ".+/A2" is already being used`,
},
{
label: "directory/empty",
mkDir: "A1",
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
},
{
label: "directory/with file",
mkFile: "A1/some file",
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
wantErr: `directory ".+/A1" is already being used`,
},
{
label: "directory/active alloc with file",
mkFile: "A1/some file",
activeAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
targetAllocs: []daecommon.ConfigStorageAllocation{
{DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900},
},
},
}
for _, test := range tests {
t.Run(test.label, func(t *testing.T) {
dir := t.TempDir()
joinRoot := func(allocs []daecommon.ConfigStorageAllocation) {
for i := range allocs {
allocs[i].DataPath = filepath.Join(dir, allocs[i].DataPath)
allocs[i].MetaPath = filepath.Join(dir, allocs[i].MetaPath)
}
}
joinRoot(test.activeAllocs)
joinRoot(test.targetAllocs)
if test.mkDir != "" {
path := filepath.Join(dir, test.mkDir)
require.NoError(t, os.MkdirAll(path, 0755))
}
if test.mkFile != "" {
path := filepath.Join(dir, test.mkFile)
dirPath := filepath.Dir(path)
require.NoError(t, os.MkdirAll(dirPath, 0755))
require.NoError(t, os.WriteFile(path, []byte("HI"), 0644))
}
err := validateTargetAllocs(test.activeAllocs, test.targetAllocs)
if test.wantErr == "" {
assert.NoError(t, err)
} else {
wantErrRegexp := regexp.MustCompile(test.wantErr)
assert.True(
t,
wantErrRegexp.MatchString(err.Error()),
"wantErr: %s\nerr:%s",
test.wantErr,
err,
)
}
})
}
}