Files
kustomize/kstatus/wait/wait.go

333 lines
11 KiB
Go

// Copyright 2019 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package wait
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kstatus/status"
)
const (
defaultNamespace = "default"
)
// ResourceIdentifier defines the functions needed to identify
// a resource in a cluster. This interface is implemented by
// both unstructured.Unstructured and the standard Kubernetes types.
type KubernetesObject interface {
GetName() string
GetNamespace() string
GroupVersionKind() schema.GroupVersionKind
}
// ResourceIdentifier contains the information needed to uniquely
// identify a resource in a cluster.
type ResourceIdentifier struct {
Name string
Namespace string
GroupKind schema.GroupKind
}
// Equals compares two ResourceIdentifiers and returns true if they
// refer to the same resource. Special handling is needed for namespace
// since an empty namespace for a namespace-scoped resource is defaulted
// to the "default" namespace.
func (r ResourceIdentifier) Equals(other ResourceIdentifier) bool {
isSameNamespace := r.Namespace == other.Namespace ||
(r.Namespace == "" && other.Namespace == defaultNamespace) ||
(r.Namespace == defaultNamespace && other.Namespace == "")
return r.GroupKind == other.GroupKind &&
r.Name == other.Name &&
isSameNamespace
}
// Resolver provides the functions for resolving status of a list of resources.
type Resolver struct {
// client is the client used to talk
// with the cluster. It uses the Reader interface
// from controller-runtime.
client client.Reader
// mapper is the RESTMapper needed to look up mappings
// for resource types.
mapper meta.RESTMapper
// statusComputeFunc defines which function should be used for computing
// the status of a resource. This is available for testing purposes.
statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)
// pollInterval defines the frequency with which the resolver should poll
// the cluster for the state of resources. More frequent polling will
// lead to more load on the cluster.
pollInterval time.Duration
}
// NewResolver creates a new resolver with the provided client. Fetching
// and polling of resources will be done using the provided client.
func NewResolver(client client.Reader, mapper meta.RESTMapper, pollInterval time.Duration) *Resolver {
return &Resolver{
client: client,
mapper: mapper,
statusComputeFunc: status.Compute,
pollInterval: pollInterval,
}
}
// ResourceResult is the status result for a given resource. It provides
// information about the resource if the request was successful and an
// error if something went wrong.
type ResourceResult struct {
Result *status.Result
ResourceIdentifier ResourceIdentifier
Error error
}
// FetchAndResolveObjects returns the status for a list of kubernetes objects. These can be provided
// either as Unstructured resources or the specific resource types. It will return the status for each
// of them individually. The provided resources will only be used to get the information needed to
// fetch the updated state of the resources from the cluster.
func (r *Resolver) FetchAndResolveObjects(ctx context.Context, objects []KubernetesObject) []ResourceResult {
resourceIds := resourceIdentifiersFromObjects(objects)
return r.FetchAndResolve(ctx, resourceIds)
}
// FetchAndResolve returns the status for a list of ResourceIdentifiers. It will return
// the status for each of them individually.
func (r *Resolver) FetchAndResolve(ctx context.Context, resourceIDs []ResourceIdentifier) []ResourceResult {
var results []ResourceResult
for _, resourceID := range resourceIDs {
u, err := r.fetchResource(ctx, resourceID)
if err != nil {
if k8serrors.IsNotFound(errors.Cause(err)) {
results = append(results, ResourceResult{
ResourceIdentifier: resourceID,
Result: &status.Result{
Status: status.CurrentStatus,
Message: "Resource does not exist",
},
})
} else {
results = append(results, ResourceResult{
Result: &status.Result{
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error fetching resource from cluster: %v", err),
},
ResourceIdentifier: resourceID,
Error: err,
})
}
continue
}
res, err := r.statusComputeFunc(u)
results = append(results, ResourceResult{
Result: res,
ResourceIdentifier: resourceID,
Error: err,
})
}
return results
}
// Event is returned through the channel returned after a call
// to WaitForStatus. It contains an update to either an individual
// resource or to the aggregate status for the set of resources.
type Event struct {
// Type defines which type of event this is.
Type EventType
// AggregateStatus is the aggregated status for all the provided resources.
AggregateStatus status.Status
// EventResource is information about the event to which this event pertains.
// This is only populated for ResourceUpdate events.
EventResource *EventResource
}
type EventType string
const (
// The status/message for a resource has changed. This also means the
// aggregate status might have changed.
ResourceUpdate EventType = "ResourceUpdate"
// All resources have reached the current status.
Completed EventType = "Completed"
// The wait was stopped before all resources could reach the
// Current status.
Aborted EventType = "Aborted"
)
// EventResource contains information about the resource for which
// a specific Event pertains.
type EventResource struct {
// Identifier contains information that identifies which resource
// this information is about.
ResourceIdentifier ResourceIdentifier
// Status is the latest status for the given resource.
Status status.Status
// Message is more details about the status.
Message string
// Error is set if there was a problem identifying the status
// of the resource. For example, if polling the cluster for information
// about the resource failed.
Error error
}
// WaitForStatus polls all the provided resources until all of them have reached the Current
// status or the timeout specified through the context is reached. Updates on the status
// of individual resources and the aggregate status is provided through the Event channel.
func (r *Resolver) WaitForStatusOfObjects(ctx context.Context, objects []KubernetesObject) <-chan Event {
resourceIds := resourceIdentifiersFromObjects(objects)
return r.WaitForStatus(ctx, resourceIds)
}
// WaitForStatus polls all the resources references by the provided ResourceIdentifiers until
// all of them have reached the Current status or the timeout specified through the context is
// reached. Updates on the status of individual resources and the aggregate status is provided
// through the Event channel.
func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdentifier) <-chan Event {
eventChan := make(chan Event)
go func() {
ticker := time.NewTicker(r.pollInterval)
defer func() {
ticker.Stop()
// Make sure the channel is closed so consumers can detect that
// we have completed.
close(eventChan)
}()
// No need to wait if we have no resources. We consider
// this a situation where the status is Current.
if len(resources) == 0 {
eventChan <- Event{
Type: Completed,
AggregateStatus: status.CurrentStatus,
EventResource: nil,
}
return
}
// Initiate a new waitStatus object to keep track of the
// resources while polling the state.
waitState := newWaitState(resources, r.statusComputeFunc)
// Check all resources immediately. If the aggregate status is already
// Current, we can exit immediately.
if r.checkAllResources(ctx, waitState, eventChan) {
return
}
// Loop until either all resources have reached the Current status
// or until the wait is cancelled through the context. In both cases
// we will break out of the loop by returning from the function.
for {
select {
case <-ctx.Done():
// The context has been cancelled, so report the most recent
// aggregate status, report it through the channel and then
// break out of the loop (which will close the channel).
eventChan <- Event{
Type: Aborted,
AggregateStatus: waitState.AggregateStatus(),
}
return
case <-ticker.C:
// Every time the ticker fires, we check the status of all
// resources. If the aggregate status has reached Current, checkAllResources
// will return true. If so, we just return.
if r.checkAllResources(ctx, waitState, eventChan) {
return
}
}
}
}()
return eventChan
}
// checkAllResources fetches all resources from the cluster,
// checks if their status has changed and send an event for each resource
// with a new status. In each event, we also include the latest aggregate
// status. Finally, if the aggregate status becomes Current, send a final
// Completed type event. If the aggregate status has become Current, this function
// will return true to signal that it is done.
func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState, eventChan chan Event) bool {
for resourceID := range waitState.ResourceWaitStates {
// Make sure we have a local copy since we are passing
// pointers to this variable as parameters to functions
u, err := r.fetchResource(ctx, resourceID)
eventResource, updateObserved := waitState.ResourceObserved(resourceID, u, err)
// Find the aggregate status based on the new state for this resource.
aggStatus := waitState.AggregateStatus()
// We want events for changes in status for each resource, so send
// an event for this resource before checking if the aggregate status
// has become Current.
if updateObserved {
eventChan <- Event{
Type: ResourceUpdate,
AggregateStatus: aggStatus,
EventResource: &eventResource,
}
}
// If aggregate status is Current, we are done!
if aggStatus == status.CurrentStatus {
eventChan <- Event{
Type: Completed,
AggregateStatus: status.CurrentStatus,
}
return true
}
}
return false
}
// fetchResource gets the resource given by the identifier from the cluster
// through the client available in the Resolver. It returns the resource
// as an Unstructured.
func (r *Resolver) fetchResource(ctx context.Context, identifier ResourceIdentifier) (*unstructured.Unstructured, error) {
// We need to look up the preferred version for the GroupKind and
// whether the resource type is cluster scoped. We look this
// up with the RESTMapper.
mapping, err := r.mapper.RESTMapping(identifier.GroupKind)
if err != nil {
return nil, err
}
// Resources might not have the namespace set, which means we need to set
// it to `default` if the resource is namespace scoped.
namespace := identifier.Namespace
if namespace == "" && mapping.Scope.Name() == meta.RESTScopeNameNamespace {
namespace = defaultNamespace
}
key := types.NamespacedName{Name: identifier.Name, Namespace: namespace}
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(mapping.GroupVersionKind)
err = r.client.Get(ctx, key, u)
if err != nil {
return nil, errors.Wrap(err, "error fetching resource from cluster")
}
return u, nil
}