From 24ac619798253d06eb6a9de36f57f079bef36604 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Tue, 7 Jan 2025 15:39:26 +0100 Subject: [PATCH] Implement garage layout manager --- go/daemon/network/glm/glm.go | 168 +++++++++++++++++++ go/daemon/network/glm/state_transition.go | 144 ++++++++++++++++ go/daemon/network/glm/validate.go | 69 ++++++++ go/daemon/network/glm/validate_test.go | 190 ++++++++++++++++++++++ 4 files changed, 571 insertions(+) create mode 100644 go/daemon/network/glm/glm.go create mode 100644 go/daemon/network/glm/state_transition.go create mode 100644 go/daemon/network/glm/validate.go create mode 100644 go/daemon/network/glm/validate_test.go diff --git a/go/daemon/network/glm/glm.go b/go/daemon/network/glm/glm.go new file mode 100644 index 0000000..5d246b3 --- /dev/null +++ b/go/daemon/network/glm/glm.go @@ -0,0 +1,168 @@ +// Package glm, "garage layout manager", implements the transition of garage +// layout states based on the currently active layout and the desired layout +// defined by the user. +package glm + +import ( + "context" + "errors" + "fmt" + "io/fs" + "isle/daemon/daecommon" + "isle/garage" + "isle/jsonutil" + "isle/toolkit" + "net/netip" + "path/filepath" +) + +// GarageLayoutManager (GLM) tracks the currently active set of storage +// allocations and calculates actions required to transition from the active set +// into a target set. +// +// GLM will make sure that when allocations are removed they are properly +// drained prior to being fully removed from the cluster. +type GarageLayoutManager interface { + + // GetActiveAllocations returns the currently active set of allocations, as + // recorded by the last call to CommitStateTransition. Returns an empty set + // if CommitStateTransition has never been called. + GetActiveAllocations(context.Context) ( + []daecommon.ConfigStorageAllocation, error, + ) + + // SetActiveAllocations overwrites the stored active allocations, if any, to + // the given one. + SetActiveAllocations(context.Context, []daecommon.ConfigStorageAllocation) error + + // Validate checks the target allocation set for any inconsistencies with + // the currently active one, and returns an error if one is found. + Validate( + _ context.Context, targetAllocs []daecommon.ConfigStorageAllocation, + ) error + + // CalculateStateTransition accepts a set of known nodes from an up-to-date + // [garage.ClusterStatus] and the target allocation set for the host, and + // returns a StateTransition describing the actions which should be taken. + // + // If the host is not running any garage instances, and therefore cannot + // determine the known nodes, nil should be passed instead. + CalculateStateTransition( + _ context.Context, + knownNodes []garage.KnownNode, + targetAllocs []daecommon.ConfigStorageAllocation, + ) ( + StateTransition, error, + ) + + // CommitStateTransition should be called after the CalculateStateTransition + // returns a StateTransition, and the StateTransition's prescribed actions + // have been successfully carried out. + CommitStateTransition(context.Context, StateTransition) error +} + +type garageLayoutManager struct { + dir toolkit.Dir + hostIP netip.Addr +} + +// NewGarageLayoutManager initializes and returns a GarageLayoutManager which +// will use the given directory to store state, and which is managing the layout +// for the host with the given IP. +func NewGarageLayoutManager( + dir toolkit.Dir, hostIP netip.Addr, +) GarageLayoutManager { + return &garageLayoutManager{dir, hostIP} +} + +const glmStateFile = "glm.json" + +type glmState struct { + ActiveAllocations []daecommon.ConfigStorageAllocation +} + +func (glm *garageLayoutManager) get() (glmState, error) { + var ( + path = filepath.Join(glm.dir.Path, glmStateFile) + state glmState + ) + + err := jsonutil.LoadFile(&state, path) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return glmState{}, err + } + + return state, nil +} + +func (glm *garageLayoutManager) set(state glmState) error { + path := filepath.Join(glm.dir.Path, glmStateFile) + return jsonutil.WriteFile(state, path, 0600) +} + +func (glm *garageLayoutManager) GetActiveAllocations(context.Context) ( + []daecommon.ConfigStorageAllocation, error, +) { + state, err := glm.get() + return state.ActiveAllocations, err +} + +func (glm *garageLayoutManager) SetActiveAllocations( + _ context.Context, + allocs []daecommon.ConfigStorageAllocation, +) error { + return glm.set(glmState{allocs}) +} + +func (glm *garageLayoutManager) Validate( + _ context.Context, targetAllocs []daecommon.ConfigStorageAllocation, +) error { + state, err := glm.get() + if err != nil { + return fmt.Errorf("reading state: %w", err) + } + + return validateTargetAllocs( + state.ActiveAllocations, + targetAllocs, + ) +} + +func (glm *garageLayoutManager) CalculateStateTransition( + _ context.Context, + knownNodes []garage.KnownNode, + targetAllocs []daecommon.ConfigStorageAllocation, +) ( + StateTransition, error, +) { + state, err := glm.get() + if err != nil { + return StateTransition{}, fmt.Errorf("reading state: %w", err) + } + + err = validateTargetAllocs(state.ActiveAllocations, targetAllocs) + if err != nil { + return StateTransition{}, fmt.Errorf( + "validating target allocations: %w", err, + ) + } + + allKnownNodes, knownNodes := knownNodes, knownNodes[:0] + for _, node := range allKnownNodes { + if node.Addr.Addr() == glm.hostIP { + knownNodes = append(knownNodes, node) + } + } + + return calcStateTransition( + state.ActiveAllocations, knownNodes, targetAllocs, + ), nil +} + +func (glm *garageLayoutManager) CommitStateTransition( + _ context.Context, stateTransition StateTransition, +) error { + return glm.set(glmState{ + ActiveAllocations: stateTransition.ActiveAllocations(), + }) +} diff --git a/go/daemon/network/glm/state_transition.go b/go/daemon/network/glm/state_transition.go new file mode 100644 index 0000000..ace218a --- /dev/null +++ b/go/daemon/network/glm/state_transition.go @@ -0,0 +1,144 @@ +package glm + +import ( + "isle/daemon/daecommon" + "isle/garage" +) + +func allocsByRPCPort( + allocs []daecommon.ConfigStorageAllocation, +) map[int]daecommon.ConfigStorageAllocation { + m := map[int]daecommon.ConfigStorageAllocation{} + for _, alloc := range allocs { + m[alloc.RPCPort] = alloc + } + return m +} + +// AllocationWithKnownNode pairs an active storage allocation with its +// corresponding status information from garage. +type AllocationWithKnownNode struct { + garage.KnownNode + daecommon.ConfigStorageAllocation +} + +func allocsWithKnownNode( + allocs []daecommon.ConfigStorageAllocation, + knownNodes []garage.KnownNode, +) ( + res []AllocationWithKnownNode, +) { +outer: + for _, alloc := range allocs { + for _, knownNode := range knownNodes { + if alloc.RPCPort == int(knownNode.Addr.Port()) { + res = append(res, AllocationWithKnownNode{knownNode, alloc}) + continue outer + } + } + + res = append(res, AllocationWithKnownNode{ConfigStorageAllocation: alloc}) + } + + return +} + +// StateTransition describes the allocation changes which should be made as a +// result of the current state of the cluster and the target allocation state. +type StateTransition struct { + // AddModifyAllocations should be added to the garage cluster layout, if + // they are not already. + AddModifyAllocations []daecommon.ConfigStorageAllocation + + // DrainAllocations should be "removed" from the garage cluster layout, if + // the are not already. In reality "removing" from the garage cluster + // layout only removes the node's role, and puts it into the draining state. + // The node itself remains active. + DrainAllocations []AllocationWithKnownNode +} + +// DrainAllocationIDs returns the IDs of all allocations in the DrainAllocations +// field. +func (st StateTransition) DrainAllocationIDs() []string { + ids := make([]string, len(st.DrainAllocations)) + for i, alloc := range st.DrainAllocations { + ids[i] = alloc.ID + } + return ids +} + +// ActiveAllocations returns all allocations which should be active, ie should +// have corresponding garage instances. +func (st StateTransition) ActiveAllocations() []daecommon.ConfigStorageAllocation { + allocs := make( + []daecommon.ConfigStorageAllocation, + 0, + len(st.AddModifyAllocations)+len(st.DrainAllocations), + ) + + for _, alloc := range st.AddModifyAllocations { + allocs = append(allocs, alloc) + } + + for _, alloc := range st.DrainAllocations { + allocs = append(allocs, alloc.ConfigStorageAllocation) + } + + return allocs +} + +// calcStateTransition calculates the StateTransition which should be made given +// the current state of the garage layout. +// +// knownNodes is the set of KnownNode values returned from a +// [garage.ClusterStatus], filtered to only include those pertinent to this +// host. +// +// It is assumed that the active/targetAllocs pair has already been +// validated using validateTargetAllocs. +func calcStateTransition( + activeAllocs []daecommon.ConfigStorageAllocation, + knownNodes []garage.KnownNode, + targetAllocs []daecommon.ConfigStorageAllocation, +) ( + res StateTransition, +) { + var ( + activeAllocsWithKnownNodes = allocsWithKnownNode( + activeAllocs, knownNodes, + ) + targetAllocsM = allocsByRPCPort(targetAllocs) + ) + + // Deal with additions/modifications to the cluster. These are easy, since + // all target allocations belong in the cluster no matter what. + for _, targetAlloc := range targetAllocs { + res.AddModifyAllocations = append(res.AddModifyAllocations, targetAlloc) + } + + // Deal with removals from the cluster. These are harder. + for _, activeAlloc := range activeAllocsWithKnownNodes { + + // If there's a corresponding targetAlloc then this alloc was added to + // AddModifyAllocations already. Even if it was previously draining, + // once it's added/modified it won't be anymore. + if _, ok := targetAllocsM[activeAlloc.RPCPort]; ok { + continue + } + + // If it's draining then let it continue draining. + // + // If there's an associated role then this alloc is still part of the + // cluster, but it's not in the target set so it needs to be drained + // and removed. + if activeAlloc.Draining || activeAlloc.Role != nil { + res.DrainAllocations = append(res.DrainAllocations, activeAlloc) + continue + } + + // The alloc is not in the target alloc set, it's not draining, and it + // has no role in the cluster. + } + + return +} diff --git a/go/daemon/network/glm/validate.go b/go/daemon/network/glm/validate.go new file mode 100644 index 0000000..e735e0d --- /dev/null +++ b/go/daemon/network/glm/validate.go @@ -0,0 +1,69 @@ +package glm + +import ( + "errors" + "fmt" + "io/fs" + "isle/daemon/daecommon" + "os" +) + +// targetAllocs is expected to have already been validated to be internally +// consistent, ie on conflicts between ports or directories within itself. +func validateTargetAllocs( + activeAllocs, targetAllocs []daecommon.ConfigStorageAllocation, +) error { + for _, targetAlloc := range targetAllocs { + + var alreadyActive bool + for _, activeAlloc := range activeAllocs { + switch { + case targetAlloc.RPCPort == activeAlloc.RPCPort && + targetAlloc.DataPath == activeAlloc.DataPath && + targetAlloc.MetaPath == activeAlloc.MetaPath: + alreadyActive = true + break + + // Changing the value of all fields besides these is supported. + case targetAlloc.RPCPort == activeAlloc.RPCPort: + return fmt.Errorf( + "RPC port %d is already being used", activeAlloc.RPCPort, + ) + + case targetAlloc.DataPath == activeAlloc.DataPath: + return fmt.Errorf( + "Data path %q is already being used", activeAlloc.DataPath, + ) + + case targetAlloc.MetaPath == activeAlloc.MetaPath: + return fmt.Errorf( + "Meta path %q is already being used", activeAlloc.MetaPath, + ) + } + } + + if !alreadyActive { + for _, dir := range []string{ + targetAlloc.DataPath, targetAlloc.MetaPath, + } { + entries, err := os.ReadDir(dir) + if errors.Is(err, fs.ErrNotExist) { + continue + } else if err != nil { + return fmt.Errorf("reading contents of %q: %w", dir, err) + } + + if len(entries) > 0 { + return fmt.Errorf( + "directory %q is already being used, please provide the path to an empty directory", + dir, + ) + } + } + } + + // targetAlloc validated + } + + return nil +} diff --git a/go/daemon/network/glm/validate_test.go b/go/daemon/network/glm/validate_test.go new file mode 100644 index 0000000..69e5e2b --- /dev/null +++ b/go/daemon/network/glm/validate_test.go @@ -0,0 +1,190 @@ +package glm + +import ( + "isle/daemon/daecommon" + "os" + "path/filepath" + "regexp" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_validateTargetAllocs(t *testing.T) { + t.Parallel() + + tests := []struct { + label string + mkDir, mkFile string + activeAllocs, targetAllocs []daecommon.ConfigStorageAllocation + wantErr string + }{ + { + label: "empty/empty", + }, + { + label: "empty/single", + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + }, + { + label: "empty/multiple", + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + }, + }, + { + label: "no change", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + }, + }, + { + label: "change capacity and add alloc", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 2, RPCPort: 3900}, + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + }, + }, + { + label: "change rpc port", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3902}, + {DataPath: "A1", MetaPath: "A2", Capacity: 2, RPCPort: 3901}, + }, + wantErr: `Data path ".+/A1" is already being used`, + }, + { + label: "change data path", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + {DataPath: "Aa", MetaPath: "A2", Capacity: 2, RPCPort: 3900}, + }, + wantErr: `RPC port 3900 is already being used`, + }, + { + label: "change meta path", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + {DataPath: "A1", MetaPath: "Aa", Capacity: 2, RPCPort: 3900}, + }, + wantErr: `RPC port 3900 is already being used`, + }, + { + label: "conflict rpc port", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "B1", MetaPath: "B2", Capacity: 1, RPCPort: 3900}, + }, + wantErr: `RPC port 3900 is already being used`, + }, + { + label: "conflict data path", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "B2", Capacity: 1, RPCPort: 3901}, + }, + wantErr: `Data path ".+/A1" is already being used`, + }, + { + label: "conflict meta path", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "B1", MetaPath: "A2", Capacity: 1, RPCPort: 3901}, + }, + wantErr: `Meta path ".+/A2" is already being used`, + }, + { + label: "directory/empty", + mkDir: "A1", + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + }, + { + label: "directory/with file", + mkFile: "A1/some file", + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + wantErr: `directory ".+/A1" is already being used`, + }, + { + label: "directory/active alloc with file", + mkFile: "A1/some file", + activeAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + targetAllocs: []daecommon.ConfigStorageAllocation{ + {DataPath: "A1", MetaPath: "A2", Capacity: 1, RPCPort: 3900}, + }, + }, + } + + for _, test := range tests { + t.Run(test.label, func(t *testing.T) { + dir := t.TempDir() + joinRoot := func(allocs []daecommon.ConfigStorageAllocation) { + for i := range allocs { + allocs[i].DataPath = filepath.Join(dir, allocs[i].DataPath) + allocs[i].MetaPath = filepath.Join(dir, allocs[i].MetaPath) + } + } + + joinRoot(test.activeAllocs) + joinRoot(test.targetAllocs) + + if test.mkDir != "" { + path := filepath.Join(dir, test.mkDir) + require.NoError(t, os.MkdirAll(path, 0755)) + } + + if test.mkFile != "" { + path := filepath.Join(dir, test.mkFile) + dirPath := filepath.Dir(path) + require.NoError(t, os.MkdirAll(dirPath, 0755)) + require.NoError(t, os.WriteFile(path, []byte("HI"), 0644)) + } + + err := validateTargetAllocs(test.activeAllocs, test.targetAllocs) + if test.wantErr == "" { + assert.NoError(t, err) + } else { + wantErrRegexp := regexp.MustCompile(test.wantErr) + assert.True( + t, + wantErrRegexp.MatchString(err.Error()), + "wantErr: %s\nerr:%s", + test.wantErr, + err, + ) + } + }) + } +}