Files
kustomize/kstatus/wait/wait.go
Morten Torkildsen 1b3b8522f9 cli for status
2019-12-11 13:13:09 -08:00

276 lines
8.7 KiB
Go

// Copyright 2019 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package wait
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kstatus/status"
)
// ResourceIdentifier defines the functions needed to identify
// a resource in a cluster. This interface is implemented by
// both unstructured.Unstructured and the standard Kubernetes types.
type ResourceIdentifier interface {
GetName() string
GetNamespace() string
GetAPIVersion() string
GetKind() string
}
// Resolver provides the functions for resolving status of a list of resources.
type Resolver struct {
// DynamicClient is the client used to talk
// with the cluster
client client.Reader
// statusComputeFunc defines which function should be used for computing
// the status of a resource. This is available for testing purposes.
statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)
// pollInterval defines the frequency with which the resolver should poll
// the cluster for the state of resources. More frequent polling will
// lead to more load on the cluster.
pollInterval time.Duration
}
// NewResolver creates a new resolver with the provided client. Fetching
// and polling of resources will be done using the provided client.
func NewResolver(client client.Reader, pollInterval time.Duration) *Resolver {
return &Resolver{
client: client,
statusComputeFunc: status.Compute,
pollInterval: pollInterval,
}
}
// ResourceResult is the status result for a given resource. It provides
// information about the resource if the request was successful and an
// error if something went wrong.
type ResourceResult struct {
Result *status.Result
Resource ResourceIdentifier
Error error
}
// FetchAndResolve returns the status for a list of resources. It will return
// the status for each of them individually. The slice of ResourceIdentifiers will
// only be used to get the information needed to fetch the updated state of
// the resources from the cluster.
func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIdentifier) []ResourceResult {
var results []ResourceResult
for _, resource := range resources {
u, err := r.fetchResource(ctx, resource)
if err != nil {
if k8serrors.IsNotFound(errors.Cause(err)) {
results = append(results, ResourceResult{
Resource: resource,
Result: &status.Result{
Status: status.CurrentStatus,
Message: "Resource does not exist",
},
})
} else {
results = append(results, ResourceResult{
Result: &status.Result{
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error fetching resource from cluster: %v", err),
},
Resource: resource,
Error: err,
})
}
continue
}
res, err := r.statusComputeFunc(u)
results = append(results, ResourceResult{
Result: res,
Resource: resource,
Error: err,
})
}
return results
}
// Event is returned through the channel returned after a call
// to WaitForStatus. It contains an update to either an individual
// resource or to the aggregate status for the set of resources.
type Event struct {
// Type defines which type of event this is.
Type EventType
// AggregateStatus is the aggregated status for all the provided resources.
AggregateStatus status.Status
// EventResource is information about the event to which this event pertains.
// This is only populated for ResourceUpdate events.
EventResource *EventResource
}
type EventType string
const (
// The status/message for a resource has changed. This also means the
// aggregate status might have changed.
ResourceUpdate EventType = "ResourceUpdate"
// All resources have reached the current status.
Completed EventType = "Completed"
// The wait was stopped before all resources could reach the
// Current status.
Aborted EventType = "Aborted"
)
// EventResource contains information about the resource for which
// a specific Event pertains.
type EventResource struct {
// Identifier contains information that identifies which resource
// this information is about.
Identifier ResourceIdentifier
// Status is the latest status for the given resource.
Status status.Status
// Message is more details about the status.
Message string
// Error is set if there was a problem identifying the status
// of the resource. For example, if polling the cluster for information
// about the resource failed.
Error error
}
// WaitForStatus polls all the provided resources until all of them has
// reached the Current status. Updates the channel as resources change their status and
// when the wait is either completed or aborted.
func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdentifier) <-chan Event {
eventChan := make(chan Event)
go func() {
ticker := time.NewTicker(r.pollInterval)
defer func() {
ticker.Stop()
// Make sure the channel is closed so consumers can detect that
// we have completed.
close(eventChan)
}()
// No need to wait if we have no resources. We consider
// this a situation where the status is Current.
if len(resources) == 0 {
eventChan <- Event{
Type: Completed,
AggregateStatus: status.CurrentStatus,
EventResource: nil,
}
return
}
// Initiate a new waitStatus object to keep track of the
// resources while polling the state.
waitState := newWaitState(resources, r.statusComputeFunc)
// Check all resources immediately. If the aggregate status is already
// Current, we can exit immediately.
if r.checkAllResources(ctx, waitState, eventChan) {
return
}
// Loop until either all resources have reached the Current status
// or until the wait is cancelled through the context. In both cases
// we will break out of the loop by returning from the function.
for {
select {
case <-ctx.Done():
// The context has been cancelled, so report the most recent
// aggregate status, report it through the channel and then
// break out of the loop (which will close the channel).
eventChan <- Event{
Type: Aborted,
AggregateStatus: waitState.AggregateStatus(),
}
return
case <-ticker.C:
// Every time the ticker fires, we check the status of all
// resources. If the aggregate status has reached Current, checkAllResources
// will return true. If so, we just return.
if r.checkAllResources(ctx, waitState, eventChan) {
return
}
}
}
}()
return eventChan
}
// checkAllResources fetches all resources from the cluster,
// checks if their status has changed and send an event for each resource
// with a new status. In each event, we also include the latest aggregate
// status. Finally, if the aggregate status becomes Current, send a final
// Completed type event. If the aggregate status has become Current, this function
// will return true to signal that it is done.
func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState, eventChan chan Event) bool {
for id := range waitState.ResourceWaitStates {
// Make sure we have a local copy since we are passing
// pointers to this variable as parameters to functions
identifier := id
u, err := r.fetchResource(ctx, &identifier)
eventResource, updateObserved := waitState.ResourceObserved(&identifier, u, err)
// Find the aggregate status based on the new state for this resource.
aggStatus := waitState.AggregateStatus()
// We want events for changes in status for each resource, so send
// an event for this resource before checking if the aggregate status
// has become Current.
if updateObserved {
eventChan <- Event{
Type: ResourceUpdate,
AggregateStatus: aggStatus,
EventResource: &eventResource,
}
}
// If aggregate status is Current, we are done!
if aggStatus == status.CurrentStatus {
eventChan <- Event{
Type: Completed,
AggregateStatus: status.CurrentStatus,
}
return true
}
}
return false
}
// fetchResource gets the resource given by the identifier from the cluster
// through the client available in the Resolver. It returns the resource
// as an Unstructured.
func (r *Resolver) fetchResource(ctx context.Context, identifier ResourceIdentifier) (*unstructured.Unstructured, error) {
key := types.NamespacedName{Name: identifier.GetName(), Namespace: identifier.GetNamespace()}
u := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": identifier.GetAPIVersion(),
"kind": identifier.GetKind(),
},
}
err := r.client.Get(ctx, key, u)
//return u, err
if err != nil {
return nil, errors.Wrap(err, "error fetching resource from cluster")
}
return u, nil
}