Add APIs for computing status based on fetching resource info from a

cluster
This commit is contained in:
Morten Torkildsen
2019-11-15 18:51:56 -08:00
parent e5382c59a2
commit a489f30183
15 changed files with 1371 additions and 13 deletions

57
kstatus/wait/doc.go Normal file
View File

@@ -0,0 +1,57 @@
// Copyright 2019 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Package wait contains functionality for getting the statuses
// of a list of kubernetes resources. Unlike the status package,
// the functions exposed in the wait package will talk to a
// live kubernetes cluster to get the latest state of resources
// and provides functionality for polling the cluster until the
// resources reach the Current status.
//
// FetchAndResolve will fetch resources from a cluster, compute the
// status for each of them and then return the results. The list of
// resources is defined as a slice of ResourceIdentifier, which is
// an interface that is implemented by the Unstructured type. It
// only requires functions for getting the apiVersion, kind, name
// and namespace of a resource.
//
// import (
// "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
// "k8s.io/apimachinery/pkg/types"
// "sigs.k8s.io/kustomize/kstatus/wait"
// )
//
// key := types.NamespacedName{Name: "name", Namespace: "namespace"}
// deployment := &unstructured.Unstructured{
// Object: map[string]interface{}{
// "apiVersion": "apps/v1",
// "kind": "Deployment",
// },
// }
// client.Get(context.Background(), key, deployment)
// resourceIdentifiers := []wait.ResourceIdentifier{deployment}
//
// resolver := wait.NewResolver(client)
// results := resolver.FetchAndResolve(context.Background(), resourceIdentifiers)
//
// WaitForStatus also looks up status for a list of resources, but it will
// block until all the provided resources has reached the Current status or
// the wait is cancelled through the passed-in context. The function returns
// a channel that will provide updates as the status of the different
// resources change.
//
// import (
// "sigs.k8s.io/kustomize/kstatus/wait"
// )
// resolver := wait.NewResolver(client)
// eventsChan := resolver.WaitForStatus(context.Background(), resourceIdentifiers, 2 * time.Second)
// for {
// select {
// case event, ok := <-eventsChan:
// if !ok {
// return
// }
// fmt.Printf(event) // do something useful here.
// }
// }
package wait

28
kstatus/wait/util.go Normal file
View File

@@ -0,0 +1,28 @@
package wait
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// keyFromResourceIdentifier creates a resourceKey from a ResourceIdentifier.
func keyFromResourceIdentifier(i ResourceIdentifier) resourceKey {
return resourceKey{
apiVersion: i.GetAPIVersion(),
kind: i.GetKind(),
name: i.GetName(),
namespace: i.GetNamespace(),
}
}
// keyFromObject creates a resourceKey from an Object.
func keyFromObject(obj runtime.Object) resourceKey {
gvk := obj.GetObjectKind().GroupVersionKind()
r := obj.(metav1.Object)
return resourceKey{
apiVersion: gvk.GroupVersion().String(),
kind: gvk.Kind,
name: r.GetName(),
namespace: r.GetNamespace(),
}
}

258
kstatus/wait/wait.go Normal file
View File

@@ -0,0 +1,258 @@
// Copyright 2019 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package wait
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kstatus/status"
)
// ResourceIdentifier defines the functions needed to identify
// a resource in a cluster. This interface is implemented by
// both unstructured.Unstructured and the standard Kubernetes types.
type ResourceIdentifier interface {
GetName() string
GetNamespace() string
GetAPIVersion() string
GetKind() string
}
// Resolver provides the functions for resolving status of a list of resources.
type Resolver struct {
// DynamicClient is the client used to talk
// with the cluster
client client.Reader
// statusComputeFunc defines which function should be used for computing
// the status of a resource. This is available for testing purposes.
statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)
// pollInterval defines the frequency with which the resolver should poll
// the cluster for the state of resources. More frequent polling will
// lead to more load on the cluster.
pollInterval time.Duration
}
// NewResolver creates a new resolver with the provided client. Fetching
// and polling of resources will be done using the provided client.
func NewResolver(client client.Reader, pollInterval time.Duration) *Resolver {
return &Resolver{
client: client,
statusComputeFunc: status.Compute,
pollInterval: pollInterval,
}
}
// ResourceResult is the status result for a given resource. It provides
// information about the resource if the request was successful and an
// error if something went wrong.
type ResourceResult struct {
Result *status.Result
Resource ResourceIdentifier
Error error
}
// FetchAndResolve returns the status for a list of resources. It will return
// the status for each of them individually. The slice of ResourceIdentifiers will
// only be used to get the information needed to fetch the updated state of
// the resources from the cluster.
func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIdentifier) []ResourceResult {
var results []ResourceResult
for _, resource := range resources {
u, err := r.fetchResource(ctx, resource)
if err != nil {
if k8serrors.IsNotFound(errors.Cause(err)) {
results = append(results, ResourceResult{
Resource: resource,
Result: &status.Result{
Status: status.CurrentStatus,
Message: "Resource does not exist",
},
})
} else {
results = append(results, ResourceResult{
Result: &status.Result{
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error fetching resource from cluster: %v", err),
},
Resource: resource,
Error: err,
})
}
continue
}
res, err := r.statusComputeFunc(u)
results = append(results, ResourceResult{
Result: res,
Resource: resource,
Error: err,
})
}
return results
}
// Event is returned through the channel returned after a call
// to WaitForStatus. It contains an update to either an individual
// resource or to the aggregate status for the set of resources.
type Event struct {
// Type defines which type of event this is.
Type EventType
// AggregateStatus is the aggregated status for all the provided resources.
AggregateStatus status.Status
// EventResource is information about the event to which this event pertains.
// This is only populated for ResourceUpdate events.
EventResource *EventResource
}
type EventType string
const (
// The status/message for a resource has changed. This also means the
// aggregate status might have changed.
ResourceUpdate EventType = "ResourceUpdate"
// All resources have reached the current status.
Completed EventType = "Completed"
// The wait was stopped before all resources could reach the
// Current status.
Aborted EventType = "Aborted"
)
// EventResource contains information about the resource for which
// a specific Event pertains.
type EventResource struct {
// Identifier contains information that identifies which resource
// this information is about.
Identifier ResourceIdentifier
// Status is the latest status for the given resource.
Status status.Status
// Message is more details about the status.
Message string
// Error is set if there was a problem identifying the status
// of the resource. For example, if polling the cluster for information
// about the resource failed.
Error error
}
// WaitForStatus polls all the provided resources until all of them has
// reached the Current status. Updates the channel as resources change their status and
// when the wait is either completed or aborted.
func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdentifier) <-chan Event {
eventChan := make(chan Event)
go func() {
ticker := time.NewTicker(r.pollInterval)
defer func() {
ticker.Stop()
// Make sure the channel is closed so consumers can detect that
// we have completed.
close(eventChan)
}()
// No need to wait if we have no resources. We consider
// this a situation where the status is Current.
if len(resources) == 0 {
eventChan <- Event{
Type: Completed,
AggregateStatus: status.CurrentStatus,
EventResource: nil,
}
return
}
// Initiate a new waitStatus object to keep track of the
// resources while polling the state.
waitState := newWaitState(resources, r.statusComputeFunc)
// Loop until either all resources have reached the Current status
// or until the wait is cancelled through the context. In both cases
// we will break out of the loop by returning from the function.
for {
select {
case <-ctx.Done():
// The context has been cancelled, so report the most recent
// aggregate status, report it through the channel and then
// break out of the loop (which will close the channel).
eventChan <- Event{
Type: Aborted,
AggregateStatus: waitState.AggregateStatus(),
}
return
case <-ticker.C:
// Every time the ticker fires, fetch all resources from the cluster,
// check if their status has changed and send an event for each resource
// with a new status. In each event, we also include the latest aggregate
// status. Finally, if the aggregate status becomes Current, send a final
// Completed type event and then return.
for id := range waitState.ResourceWaitStates {
// Make sure we have a local copy since we are passing
// pointers to this variable as parameters to functions
identifier := id
u, err := r.fetchResource(ctx, &identifier)
eventResource, updateObserved := waitState.ResourceObserved(&identifier, u, err)
// Find the aggregate status based on the new state for this resource.
aggStatus := waitState.AggregateStatus()
// We want events for changes in status for each resource, so send
// an event for this resource before checking if the aggregate status
// has become Current.
if updateObserved {
eventChan <- Event{
Type: ResourceUpdate,
AggregateStatus: aggStatus,
EventResource: &eventResource,
}
}
// If aggregate status is Current, we are done!
if aggStatus == status.CurrentStatus {
eventChan <- Event{
Type: Completed,
AggregateStatus: status.CurrentStatus,
}
return
}
}
}
}
}()
return eventChan
}
// fetchResource gets the resource given by the identifier from the cluster
// through the client available in the Resolver. It returns the resource
// as an Unstructured.
func (r *Resolver) fetchResource(ctx context.Context, identifier ResourceIdentifier) (*unstructured.Unstructured, error) {
key := types.NamespacedName{Name: identifier.GetName(), Namespace: identifier.GetNamespace()}
u := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": identifier.GetAPIVersion(),
"kind": identifier.GetKind(),
},
}
err := r.client.Get(ctx, key, u)
//return u, err
if err != nil {
return nil, errors.Wrap(err, "error fetching resource from cluster")
}
return u, nil
}

561
kstatus/wait/wait_test.go Normal file
View File

@@ -0,0 +1,561 @@
package wait
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/kustomize/kstatus/status"
)
const (
testTimeout = 1 * time.Minute
testPollInterval = 1 * time.Second
)
func TestFetchAndResolve(t *testing.T) {
type result struct {
status status.Status
error bool
}
testCases := map[string]struct {
resources []runtime.Object
expectedResults []result
}{
"no resources": {
resources: []runtime.Object{},
expectedResults: []result{},
},
"single resource": {
resources: []runtime.Object{
&appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
Name: "myDeployment",
Namespace: "default",
},
},
},
expectedResults: []result{
{
status: status.InProgressStatus,
error: false,
},
},
},
"multiple resources": {
resources: []runtime.Object{
&appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "StatefulSet",
},
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
Name: "myStatefulSet",
Namespace: "default",
},
Spec: appsv1.StatefulSetSpec{
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
},
Status: appsv1.StatefulSetStatus{
ObservedGeneration: 1,
},
},
&corev1.Secret{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Secret",
},
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
Name: "mySecret",
Namespace: "default",
},
},
},
expectedResults: []result{
{
status: status.CurrentStatus,
error: false,
},
{
status: status.CurrentStatus,
error: false,
},
},
},
}
for tn, tc := range testCases {
tc := tc
t.Run(tn, func(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, tc.resources...)
resolver := NewResolver(fakeClient, testPollInterval)
resolver.statusComputeFunc = status.Compute
var identifiers []ResourceIdentifier
for _, resource := range tc.resources {
gvk := resource.GetObjectKind().GroupVersionKind()
r := resource.(metav1.Object)
identifiers = append(identifiers, &resourceKey{
name: r.GetName(),
namespace: r.GetNamespace(),
apiVersion: gvk.GroupVersion().String(),
kind: gvk.Kind,
})
}
results := resolver.FetchAndResolve(context.TODO(), identifiers)
for i, res := range results {
id := identifiers[i]
expectedRes := tc.expectedResults[i]
rid := fmt.Sprintf("%s/%s", id.GetNamespace(), id.GetName())
if expectedRes.error {
if res.Error == nil {
t.Errorf("expected error for resource %s, but didn't get one", rid)
}
continue
}
if res.Error != nil {
t.Errorf("didn't expected error for resource %s, but got %v", rid, res.Error)
}
if got, want := res.Result.Status, expectedRes.status; got != want {
t.Errorf("expected status %s for resources %s, but got %s", want, rid, got)
}
}
})
}
}
func TestFetchAndResolveUnknownResource(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
resolver := NewResolver(fakeClient, testPollInterval)
results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{
&resourceKey{
apiVersion: "apps/v1",
kind: "Deploymnet",
name: "myDeployment",
namespace: "default",
},
})
if want, got := 1, len(results); want != got {
t.Errorf("expected %d results, but got %d", want, got)
}
res := results[0]
if want, got := status.CurrentStatus, res.Result.Status; got != want {
t.Errorf("expected status %s, but got %s", want, got)
}
if res.Error != nil {
t.Errorf("expected no error, but got %v", res.Error)
}
}
func TestFetchAndResolveWithFetchError(t *testing.T) {
expectedError := errors.New("failed to fetch resource")
resolver := NewResolver(
&fakeReader{
Err: expectedError,
},
testPollInterval,
)
results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{
&resourceKey{
apiVersion: "apps/v1",
kind: "Deploymnet",
name: "myDeployment",
namespace: "default",
},
})
if want, got := 1, len(results); want != got {
t.Errorf("expected %d results, but got %d", want, got)
}
res := results[0]
if want, got := status.UnknownStatus, res.Result.Status; got != want {
t.Errorf("expected status %s, but got %s", want, got)
}
if want, got := expectedError, errors.Cause(res.Error); got != want {
t.Errorf("expected error %v, but got %v", want, got)
}
}
func TestFetchAndResolveComputeStatusError(t *testing.T) {
expectedError := errors.New("this is a test")
resource := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
Name: "myDeployment",
Namespace: "default",
},
}
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, resource)
resolver := NewResolver(fakeClient, testPollInterval)
resolver.statusComputeFunc = func(u *unstructured.Unstructured) (*status.Result, error) {
return &status.Result{
Status: status.UnknownStatus,
Message: "Got an error",
}, expectedError
}
results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{
&resourceKey{
apiVersion: resource.APIVersion,
kind: resource.Kind,
name: resource.GetName(),
namespace: resource.GetNamespace(),
},
})
if want, got := 1, len(results); want != got {
t.Errorf("expected %d results, but got %d", want, got)
}
res := results[0]
if want, got := expectedError, res.Error; got != want {
t.Errorf("expected error %v, but got %v", want, got)
}
if want, got := status.UnknownStatus, res.Result.Status; got != want {
t.Errorf("expected status %s, but got %s", want, got)
}
}
type fakeReader struct {
Called int
Err error
}
func (f *fakeReader) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error {
f.Called += 1
return f.Err
}
func (f *fakeReader) List(ctx context.Context, list runtime.Object, opts ...client.ListOption) error {
return errors.New("list not used")
}
func TestWaitForStatus(t *testing.T) {
testCases := map[string]struct {
resources map[runtime.Object][]*status.Result
expectedResourceStatuses map[runtime.Object][]status.Status
expectedAggregateStatuses []status.Status
}{
"no resources": {
resources: map[runtime.Object][]*status.Result{},
expectedResourceStatuses: map[runtime.Object][]status.Status{},
expectedAggregateStatuses: []status.Status{
status.CurrentStatus,
},
},
"single resource": {
resources: map[runtime.Object][]*status.Result{
deploymentResource: {
{
Status: status.InProgressStatus,
Message: "FirstInProgress",
},
{
Status: status.InProgressStatus,
Message: "SecondInProgress",
},
{
Status: status.CurrentStatus,
Message: "CurrentProgress",
},
},
},
expectedResourceStatuses: map[runtime.Object][]status.Status{
deploymentResource: {
status.InProgressStatus,
status.InProgressStatus,
status.CurrentStatus,
},
},
expectedAggregateStatuses: []status.Status{
status.InProgressStatus,
status.InProgressStatus,
status.CurrentStatus,
status.CurrentStatus,
},
},
"multiple resource": {
resources: map[runtime.Object][]*status.Result{
statefulSetResource: {
{
Status: status.InProgressStatus,
Message: "FirstUnknown",
},
{
Status: status.InProgressStatus,
Message: "SecondInProgress",
},
{
Status: status.CurrentStatus,
Message: "CurrentProgress",
},
},
serviceResource: {
{
Status: status.CurrentStatus,
Message: "CurrentImmediately",
},
},
},
expectedResourceStatuses: map[runtime.Object][]status.Status{
statefulSetResource: {
status.InProgressStatus,
status.InProgressStatus,
status.CurrentStatus,
},
serviceResource: {
status.CurrentStatus,
},
},
expectedAggregateStatuses: []status.Status{
status.UnknownStatus,
status.InProgressStatus,
status.InProgressStatus,
status.CurrentStatus,
status.CurrentStatus,
},
},
}
for tn, tc := range testCases {
tc := tc
t.Run(tn, func(t *testing.T) {
var objs []runtime.Object
statusResults := make(map[resourceKey][]*status.Result)
var identifiers []ResourceIdentifier
for obj, statuses := range tc.resources {
objs = append(objs, obj)
identifier := keyFromObject(obj)
identifiers = append(identifiers, &identifier)
statusResults[identifier] = statuses
}
statusComputer := statusComputer{
results: statusResults,
resourceCallCount: make(map[resourceKey]int),
}
resolver := &Resolver{
client: fake.NewFakeClientWithScheme(scheme.Scheme, objs...),
statusComputeFunc: statusComputer.Compute,
pollInterval: testPollInterval,
}
eventChan := resolver.WaitForStatus(context.TODO(), identifiers)
var events []Event
timer := time.NewTimer(testTimeout)
loop:
for {
select {
case event, ok := <-eventChan:
if !ok {
break loop
}
events = append(events, event)
case <-timer.C:
t.Fatalf("timeout waiting for resources to reach current status")
}
}
var aggregateStatuses []status.Status
resourceStatuses := make(map[resourceKey][]status.Status)
for _, e := range events {
aggregateStatuses = append(aggregateStatuses, e.AggregateStatus)
if e.EventResource != nil {
identifier := keyFromResourceIdentifier(e.EventResource.Identifier)
resourceStatuses[identifier] = append(resourceStatuses[identifier], e.EventResource.Status)
}
}
for resource, expectedStatuses := range tc.expectedResourceStatuses {
identifier := keyFromObject(resource)
actualStatuses := resourceStatuses[identifier]
if !reflect.DeepEqual(expectedStatuses, actualStatuses) {
t.Errorf("expected statuses %v for resource %s/%s, but got %v", expectedStatuses, identifier.namespace, identifier.name, actualStatuses)
}
}
if !reflect.DeepEqual(tc.expectedAggregateStatuses, aggregateStatuses) {
t.Errorf("expected aggregate statuses %v, but got %v", tc.expectedAggregateStatuses, aggregateStatuses)
}
})
}
}
func TestWaitForStatusDeletedResources(t *testing.T) {
statusComputer := statusComputer{
results: make(map[resourceKey][]*status.Result),
resourceCallCount: make(map[resourceKey]int),
}
resolver := &Resolver{
client: fake.NewFakeClientWithScheme(scheme.Scheme),
statusComputeFunc: statusComputer.Compute,
pollInterval: testPollInterval,
}
depResourceIdentifier := keyFromObject(deploymentResource)
serviceResourceIdentifier := keyFromObject(serviceResource)
identifiers := []ResourceIdentifier{
&depResourceIdentifier,
&serviceResourceIdentifier,
}
eventChan := resolver.WaitForStatus(context.TODO(), identifiers)
var events []Event
timer := time.NewTimer(testTimeout)
loop:
for {
select {
case event, ok := <-eventChan:
if !ok {
break loop
}
events = append(events, event)
case <-timer.C:
t.Fatalf("timeout waiting for resources to reach current status")
}
}
expectedEvents := []struct {
aggregateStatus status.Status
hasResource bool
resourceStatus status.Status
}{
{
aggregateStatus: status.UnknownStatus,
hasResource: true,
resourceStatus: status.CurrentStatus,
},
{
aggregateStatus: status.CurrentStatus,
hasResource: true,
resourceStatus: status.CurrentStatus,
},
{
aggregateStatus: status.CurrentStatus,
hasResource: false,
},
}
if want, got := len(expectedEvents), len(events); got != want {
t.Errorf("expected %d events, but got %d", want, got)
}
for i, e := range events {
ee := expectedEvents[i]
if want, got := ee.aggregateStatus, e.AggregateStatus; got != want {
t.Errorf("expected event %d to be %s, but got %s", i, want, got)
}
if ee.hasResource {
if want, got := ee.resourceStatus, e.EventResource.Status; want != got {
t.Errorf("expected resource event %d to be %s, but got %s", i, want, got)
}
}
}
}
type statusComputer struct {
t *testing.T
results map[resourceKey][]*status.Result
resourceCallCount map[resourceKey]int
}
func (s *statusComputer) Compute(u *unstructured.Unstructured) (*status.Result, error) {
identifier := resourceKey{
apiVersion: u.GetAPIVersion(),
kind: u.GetKind(),
name: u.GetName(),
namespace: u.GetNamespace(),
}
resourceResults, ok := s.results[identifier]
if !ok {
s.t.Fatalf("No results available for resource %s/%s", u.GetNamespace(), u.GetName())
}
callCount := s.resourceCallCount[identifier]
var res *status.Result
if len(resourceResults) <= callCount {
res = resourceResults[len(resourceResults)-1]
} else {
res = resourceResults[callCount]
}
s.resourceCallCount[identifier] = callCount + 1
return res, nil
}
var deploymentResource = &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: "myDeployment",
Namespace: "default",
},
}
var statefulSetResource = &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "StatefulSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: "myStatefulSet",
Namespace: "default",
},
}
var serviceResource = &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: "myService",
Namespace: "default",
},
}

211
kstatus/wait/waitstate.go Normal file
View File

@@ -0,0 +1,211 @@
package wait
import (
"fmt"
"reflect"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/kustomize/kstatus/status"
)
// resourceKey is a minimal implementation of
// the ResourceIdentifier interface.
type resourceKey struct {
name string
namespace string
apiVersion string
kind string
}
// GetName returns the name of the resource.
func (r *resourceKey) GetName() string {
return r.name
}
// GetNamespace returns the namespace of the resource.
func (r *resourceKey) GetNamespace() string {
return r.namespace
}
// GetAPIVersion returns the API version of the resource.
func (r *resourceKey) GetAPIVersion() string {
return r.apiVersion
}
// GetKind returns the Kind of the resource.
func (r *resourceKey) GetKind() string {
return r.kind
}
// waitState keeps the state about the resources and their last
// observed state. This is used to determine any changes in state
// so events can be sent when needed.
type waitState struct {
// ResourceWaitStates contains wait state for each of the resources.
ResourceWaitStates map[resourceKey]*resourceWaitState
// statusComputeFunc defines the function used to compute the state of
// a single resource. This is available for testing purposes.
statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)
}
// resourceWaitState contains state information about an individual resource.
type resourceWaitState struct {
FirstSeenGeneration *int64
HasBeenCurrent bool
Observed bool
LastEvent *EventResource
}
// newWaitState creates a new waitState object and initializes it with the
// provided slice of resources and the provided statusComputeFunc.
func newWaitState(resources []ResourceIdentifier, statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)) *waitState {
resourceWaitStates := make(map[resourceKey]*resourceWaitState)
for _, r := range resources {
identifier := resourceKey{
apiVersion: r.GetAPIVersion(),
kind: r.GetKind(),
name: r.GetName(),
namespace: r.GetNamespace(),
}
resourceWaitStates[identifier] = &resourceWaitState{}
}
return &waitState{
ResourceWaitStates: resourceWaitStates,
statusComputeFunc: statusComputeFunc,
}
}
// AggregateStatus computes the aggregate status for all the resources.
// TODO: Ideally we would like this to be pluggable for different strategies.
func (w *waitState) AggregateStatus() status.Status {
allCurrent := true
for _, rws := range w.ResourceWaitStates {
if !rws.Observed {
return status.UnknownStatus
}
if !rws.HasBeenCurrent {
allCurrent = false
}
}
if allCurrent {
return status.CurrentStatus
}
return status.InProgressStatus
}
// ResourceObserved notifies the waitState that we have new state for
// a resource. This also accepts an error in case fetching the resource
// from a cluster failed. It returns an EventResource object that contains
// information about the observed resource, including the identifier and
// the latest status for the resource. The function also returns a bool value
// that will be true if the status of the observed resource has changed
// since the previous observation and false it not. This is used to determine
// whether a new event should be sent based on this observation.
func (w *waitState) ResourceObserved(id ResourceIdentifier, resource *unstructured.Unstructured, err error) (EventResource, bool) {
identifier := resourceKey{
name: id.GetName(),
namespace: id.GetNamespace(),
apiVersion: id.GetAPIVersion(),
kind: id.GetKind(),
}
// Check for nil is not needed here as the id passed in comes
// from iterating over the keys of the map.
rws := w.ResourceWaitStates[identifier]
eventResource := w.getEventResource(identifier, resource, err)
// If the new eventResource is identical to the previous one, we return
// with the last return value indicating this is not a new event.
if rws.LastEvent != nil && reflect.DeepEqual(eventResource, *rws.LastEvent) {
return eventResource, false
}
rws.LastEvent = &eventResource
return eventResource, true
}
// getEventResource creates a new EventResource for the resource identified by
// the provided resourceKey. The EventResource contains information about the
// latest status for the given resource, so it computes status for the resource
// as well as check for deletion.
func (w *waitState) getEventResource(identifier resourceKey, resource *unstructured.Unstructured, err error) EventResource {
// Get the resourceWaitState for this resource. It contains information
// of the previous observed statuses. We don't need to check for nil here
// as the identifier comes from iterating over the keys of the
// ResourceWaitState map.
r := w.ResourceWaitStates[identifier]
// If fetching the resource from the cluster failed, we don't really
// know anything about the status of the resource, so simply
// report the status as Unknown.
if err != nil && !k8serrors.IsNotFound(errors.Cause(err)) {
return EventResource{
Identifier: &identifier,
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error: %s", err),
Error: err,
}
}
// If we get here, we have successfully fetched the resource from
// the cluster, or discovered that it doesn't exist.
r.Observed = true
// We treat a non-existent resource as Current. This is to properly
// handle deletion scenarios.
if k8serrors.IsNotFound(errors.Cause(err)) {
r.HasBeenCurrent = true
return EventResource{
Identifier: &identifier,
Status: status.CurrentStatus,
Message: fmt.Sprintf("Resource has been deleted"),
}
}
// We want to capture the first seen generation of the resource. This
// allows us to discover if a resource is updated while we are waiting
// for it to become Current.
if r.FirstSeenGeneration != nil {
gen := resource.GetGeneration()
r.FirstSeenGeneration = &gen
}
if resource.GetDeletionTimestamp() != nil {
return EventResource{
Identifier: &identifier,
Status: status.TerminatingStatus,
Message: fmt.Sprintf("Resource is terminating"),
}
}
statusResult, err := w.statusComputeFunc(resource)
// If we can't compute status for the resource, we report the status
// as Unknown.
if err != nil {
return EventResource{
Identifier: &identifier,
Status: status.UnknownStatus,
Message: fmt.Sprintf("Error: %s", err),
Error: err,
}
}
// We record whether a resource has ever been Current. This makes
// sure we can report a set of resources as being Current if all
// of them has reached the Current status at some point, but not
// necessarily at the same time.
if statusResult.Status == status.CurrentStatus {
r.HasBeenCurrent = true
}
return EventResource{
Identifier: &identifier,
Status: statusResult.Status,
Message: statusResult.Message,
}
}