Files
kustomize/kstatus/wait/waitstate.go

170 lines
6.0 KiB
Go

package wait
import (
"fmt"
"reflect"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/kustomize/kstatus/status"
)
// waitState keeps the state about the resources and their last
// observed state. This is used to determine any changes in state
// so events can be sent when needed.
type waitState struct {
// ResourceWaitStates contains wait state for each of the resources.
ResourceWaitStates map[ResourceIdentifier]*resourceWaitState
// statusComputeFunc defines the function used to compute the state of
// a single resource. This is available for testing purposes.
statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)
}
// resourceWaitState contains state information about an individual resource.
type resourceWaitState struct {
FirstSeenGeneration *int64
HasBeenCurrent bool
Observed bool
LastEvent *EventResource
}
// newWaitState creates a new waitState object and initializes it with the
// provided slice of resources and the provided statusComputeFunc.
func newWaitState(resourceIDs []ResourceIdentifier, statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)) *waitState {
resourceWaitStates := make(map[ResourceIdentifier]*resourceWaitState)
for _, resourceID := range resourceIDs {
resourceWaitStates[resourceID] = &resourceWaitState{}
}
return &waitState{
ResourceWaitStates: resourceWaitStates,
statusComputeFunc: statusComputeFunc,
}
}
// AggregateStatus computes the aggregate status for all the resources.
// TODO: Ideally we would like this to be pluggable for different strategies.
func (w *waitState) AggregateStatus() status.Status {
allCurrent := true
for _, rws := range w.ResourceWaitStates {
if !rws.Observed {
return status.UnknownStatus
}
if !rws.HasBeenCurrent {
allCurrent = false
}
}
if allCurrent {
return status.CurrentStatus
}
return status.InProgressStatus
}
// ResourceObserved notifies the waitState that we have new state for
// a resource. This also accepts an error in case fetching the resource
// from a cluster failed. It returns an EventResource object that contains
// information about the observed resource, including the identifier and
// the latest status for the resource. The function also returns a bool value
// that will be true if the status of the observed resource has changed
// since the previous observation and false it not. This is used to determine
// whether a new event should be sent based on this observation.
func (w *waitState) ResourceObserved(resourceID ResourceIdentifier, resource *unstructured.Unstructured, err error) (EventResource, bool) {
// Check for nil is not needed here as the id passed in comes
// from iterating over the keys of the map.
rws := w.ResourceWaitStates[resourceID]
eventResource := w.getEventResource(resourceID, resource, err)
// If the new eventResource is identical to the previous one, we return
// with the last return value indicating this is not a new event.
if rws.LastEvent != nil && reflect.DeepEqual(eventResource, *rws.LastEvent) {
return eventResource, false
}
rws.LastEvent = &eventResource
return eventResource, true
}
// getEventResource creates a new EventResource for the resource identified by
// the provided resourceKey. The EventResource contains information about the
// latest status for the given resource, so it computes status for the resource
// as well as check for deletion.
func (w *waitState) getEventResource(resourceID ResourceIdentifier, resource *unstructured.Unstructured, err error) EventResource {
// Get the resourceWaitState for this resource. It contains information
// of the previous observed statuses. We don't need to check for nil here
// as the identifier comes from iterating over the keys of the
// ResourceWaitState map.
r := w.ResourceWaitStates[resourceID]
// If fetching the resource from the cluster failed, we don't really
// know anything about the status of the resource, so simply
// report the status as Unknown.
if err != nil && !k8serrors.IsNotFound(errors.Cause(err)) {
return EventResource{
ResourceIdentifier: resourceID,
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error: %s", err),
Error: err,
}
}
// If we get here, we have successfully fetched the resource from
// the cluster, or discovered that it doesn't exist.
r.Observed = true
// We treat a non-existent resource as Current. This is to properly
// handle deletion scenarios.
if k8serrors.IsNotFound(errors.Cause(err)) {
r.HasBeenCurrent = true
return EventResource{
ResourceIdentifier: resourceID,
Status: status.CurrentStatus,
Message: fmt.Sprintf("Resource has been deleted"),
}
}
// We want to capture the first seen generation of the resource. This
// allows us to discover if a resource is updated while we are waiting
// for it to become Current.
if r.FirstSeenGeneration != nil {
gen := resource.GetGeneration()
r.FirstSeenGeneration = &gen
}
if resource.GetDeletionTimestamp() != nil {
return EventResource{
ResourceIdentifier: resourceID,
Status: status.TerminatingStatus,
Message: fmt.Sprintf("Resource is terminating"),
}
}
statusResult, err := w.statusComputeFunc(resource)
// If we can't compute status for the resource, we report the status
// as Unknown.
if err != nil {
return EventResource{
ResourceIdentifier: resourceID,
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error: %s", err),
Error: err,
}
}
// We record whether a resource has ever been Current. This makes
// sure we can report a set of resources as being Current if all
// of them has reached the Current status at some point, but not
// necessarily at the same time.
if statusResult.Status == status.CurrentStatus {
r.HasBeenCurrent = true
}
return EventResource{
ResourceIdentifier: resourceID,
Status: statusResult.Status,
Message: statusResult.Message,
}
}