diff --git a/cmd/kubectl/kubectlcobra/status.go b/cmd/kubectl/kubectlcobra/status.go index 38a82670a..3db008362 100644 --- a/cmd/kubectl/kubectlcobra/status.go +++ b/cmd/kubectl/kubectlcobra/status.go @@ -10,8 +10,8 @@ import ( "time" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/kubectl/pkg/cmd/util" @@ -47,7 +47,12 @@ func (s *StatusOptions) AddFlags(c *cobra.Command) { } func (s *StatusOptions) waitForStatus(infos []*resource.Info) error { - c, err := getClient(s.factory) + mapper, err := getRESTMapper(s.factory) + if err != nil { + return err + } + + c, err := getClient(s.factory, mapper) if err != nil { return err } @@ -55,16 +60,15 @@ func (s *StatusOptions) waitForStatus(infos []*resource.Info) error { ctx, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() - resolver := wait.NewResolver(c, s.period) + resolver := wait.NewResolver(c, mapper, s.period) ch := resolver.WaitForStatus(ctx, infosToResourceIdentifiers(infos)) for msg := range ch { switch msg.Type { case wait.ResourceUpdate: - id := msg.EventResource.Identifier - gv, _ := schema.ParseGroupVersion(id.GetAPIVersion()) - gvk := gv.WithKind(id.GetKind()) - fmt.Fprintf(s.ioStreams.Out, "%s/%s is %s: %s\n", strings.ToLower(gvk.GroupKind().String()), id.GetName(), msg.EventResource.Status.String(), msg.EventResource.Message) + id := msg.EventResource.ResourceIdentifier + gk := id.GroupKind + fmt.Fprintf(s.ioStreams.Out, "%s/%s is %s: %s\n", strings.ToLower(gk.String()), id.Name, msg.EventResource.Status.String(), msg.EventResource.Message) case wait.Completed: fmt.Fprint(s.ioStreams.Out, "all resources has reached the Current status\n") case wait.Aborted: @@ -78,18 +82,21 @@ func infosToResourceIdentifiers(infos []*resource.Info) []wait.ResourceIdentifie var resources []wait.ResourceIdentifier for _, info := range infos { u := info.Object.(*unstructured.Unstructured) - resources = append(resources, u) + resources = append(resources, wait.ResourceIdentifier{ + GroupKind: u.GroupVersionKind().GroupKind(), + Namespace: u.GetNamespace(), + Name: u.GetName(), + }) } return resources } -func getClient(f util.Factory) (client.Reader, error) { - config, err := f.ToRESTConfig() - if err != nil { - return nil, err - } +func getRESTMapper(f util.Factory) (meta.RESTMapper, error) { + return f.ToRESTMapper() +} - mapper, err := f.ToRESTMapper() +func getClient(f util.Factory, mapper meta.RESTMapper) (client.Reader, error) { + config, err := f.ToRESTConfig() if err != nil { return nil, err } diff --git a/cmd/resource/status/cmd/events.go b/cmd/resource/status/cmd/events.go index 818f2d333..3e981a2d2 100644 --- a/cmd/resource/status/cmd/events.go +++ b/cmd/resource/status/cmd/events.go @@ -10,14 +10,13 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "sigs.k8s.io/kustomize/cmd/resource/status/generateddocs/commands" - "sigs.k8s.io/kustomize/kstatus/wait" "sigs.k8s.io/kustomize/kyaml/kio" ) // GetEventsRunner returns a command EventsRunner. func GetEventsRunner() *EventsRunner { r := &EventsRunner{ - createClientFunc: createClient, + newResolverFunc: newResolver, } c := &cobra.Command{ Use: "events DIR...", @@ -49,18 +48,16 @@ type EventsRunner struct { Timeout time.Duration Command *cobra.Command - createClientFunc createClientFunc + newResolverFunc newResolverFunc } func (r *EventsRunner) runE(c *cobra.Command, args []string) error { ctx := context.Background() - // Create a client and use it to set up a new resolver. - client, err := r.createClientFunc() + resolver, err := r.newResolverFunc(r.Interval) if err != nil { - return errors.Wrap(err, "error creating client") + return errors.Wrap(err, "error creating resolver") } - resolver := wait.NewResolver(client, r.Interval) // Set up a CaptureIdentifierFilter and run all inputs through the // filter with the pipeline to capture the inventory of resources diff --git a/cmd/resource/status/cmd/events_test.go b/cmd/resource/status/cmd/events_test.go index ee3225079..b570302df 100644 --- a/cmd/resource/status/cmd/events_test.go +++ b/cmd/resource/status/cmd/events_test.go @@ -9,6 +9,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/kustomize/kstatus/status" "sigs.k8s.io/kustomize/kstatus/wait" ) @@ -20,7 +22,7 @@ func TestEventsNoResources(t *testing.T) { fakeClient := &FakeClient{} r := GetEventsRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) @@ -64,7 +66,7 @@ metadata: } r := GetEventsRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment")) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) @@ -141,7 +143,8 @@ items: } r := GetEventsRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient, corev1.SchemeGroupVersion.WithKind("Pod"), + corev1.SchemeGroupVersion.WithKind("Service")) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) diff --git a/cmd/resource/status/cmd/fetch.go b/cmd/resource/status/cmd/fetch.go index 8bc100766..e164fc8b4 100644 --- a/cmd/resource/status/cmd/fetch.go +++ b/cmd/resource/status/cmd/fetch.go @@ -18,7 +18,7 @@ import ( // GetFetchRunner returns a command FetchRunner. func GetFetchRunner() *FetchRunner { r := &FetchRunner{ - createClientFunc: createClient, + newResolverFunc: newResolver, } c := &cobra.Command{ Use: "fetch DIR...", @@ -44,20 +44,17 @@ type FetchRunner struct { IncludeSubpackages bool Command *cobra.Command - createClientFunc createClientFunc + newResolverFunc newResolverFunc } func (r *FetchRunner) runE(c *cobra.Command, args []string) error { ctx := context.Background() - // Create a new client and use it to set up a resolver. - k8sClient, err := r.createClientFunc() + resolver, err := r.newResolverFunc(time.Minute) if err != nil { - return errors.Wrap(err, "error creating k8sClient") + return errors.Wrap(err, "error creating resolver") } - resolver := wait.NewResolver(k8sClient, time.Minute) - // Set up a CaptureIdentifierFilter and run all inputs through the // filter with the pipeline to capture the inventory of resources // which we are interested in. @@ -108,7 +105,7 @@ func (f FetchStatusInfo) CurrentStatus() StatusData { var resourceData []ResourceStatusData for _, res := range f.Results { rsd := ResourceStatusData{ - Identifier: res.Resource, + Identifier: res.ResourceIdentifier, } if res.Error != nil { rsd.Status = status.UnknownStatus diff --git a/cmd/resource/status/cmd/fetch_test.go b/cmd/resource/status/cmd/fetch_test.go index 61a5259c4..7c26a3b0b 100644 --- a/cmd/resource/status/cmd/fetch_test.go +++ b/cmd/resource/status/cmd/fetch_test.go @@ -25,7 +25,7 @@ func TestEmptyManifest(t *testing.T) { fakeClient := fake.NewFakeClientWithScheme(scheme) r := GetFetchRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) @@ -65,7 +65,7 @@ metadata: fakeClient := fake.NewFakeClientWithScheme(scheme, deployment) r := GetFetchRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment")) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) @@ -78,7 +78,7 @@ metadata: tableOutput := parseTableOutput(t, cleanOutput) expectedResource := ResourceIdentifier{ - apiVersion: "apps/v1", + apiVersion: "apps", kind: "Deployment", namespace: "default", name: "bar", @@ -139,7 +139,8 @@ metadata: outBuffer := &bytes.Buffer{} r := GetFetchRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment"), + v1.SchemeGroupVersion.WithKind("Service")) r.Command.SetArgs([]string{d}) r.Command.SetOut(outBuffer) @@ -152,7 +153,7 @@ metadata: tableOutput := parseTableOutput(t, cleanOutput) expectedDeploymentResource := ResourceIdentifier{ - apiVersion: "apps/v1", + apiVersion: "apps", kind: "Deployment", namespace: "default", name: "foo", @@ -162,7 +163,7 @@ metadata: verifyOutputContains(t, tableOutput, expectedDeploymentResource, expectedDeploymentStatus, expectedDeploymentMessage) expectedServiceResource := ResourceIdentifier{ - apiVersion: "v1", + apiVersion: "", kind: "Service", namespace: "default", name: "foo", diff --git a/cmd/resource/status/cmd/helpers_test.go b/cmd/resource/status/cmd/helpers_test.go index 30fb02bcb..c6892cb4b 100644 --- a/cmd/resource/status/cmd/helpers_test.go +++ b/cmd/resource/status/cmd/helpers_test.go @@ -6,11 +6,15 @@ import ( "regexp" "strings" "testing" + "time" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kustomize/kstatus/status" + "sigs.k8s.io/kustomize/kstatus/wait" ) type TableOutput struct { @@ -232,10 +236,25 @@ func (f *FakeClient) Get(_ context.Context, _ client.ObjectKey, obj runtime.Obje return callbackFunc(u) } -func (f *FakeClient) List(ctx context.Context, list runtime.Object, opts ...client.ListOption) error { +func (f *FakeClient) List(context.Context, runtime.Object, ...client.ListOption) error { return nil } +func fakeResolver(fakeClient client.Reader, mapperTypes ...schema.GroupVersionKind) newResolverFunc { + return func(pollInterval time.Duration) (*wait.Resolver, error) { + var groupVersions []schema.GroupVersion + for _, gvk := range mapperTypes { + groupVersions = append(groupVersions, gvk.GroupVersion()) + } + mapper := meta.NewDefaultRESTMapper(groupVersions) + for _, gvk := range mapperTypes { + mapper.Add(gvk, meta.RESTScopeNamespace) + } + + return wait.NewResolver(fakeClient, mapper, pollInterval), nil + } +} + func joinStatuses(statuses []status.Status) string { var stringStatuses []string for _, s := range statuses { diff --git a/cmd/resource/status/cmd/print.go b/cmd/resource/status/cmd/print.go index 5f940b334..fbf872002 100644 --- a/cmd/resource/status/cmd/print.go +++ b/cmd/resource/status/cmd/print.go @@ -61,8 +61,7 @@ var ( width: 25, colorFunc: defaultColorFunc, contentFunc: func(data ResourceStatusData) string { - return fmt.Sprintf("%s/%s", data.Identifier.GetAPIVersion(), - data.Identifier.GetKind()) + return fmt.Sprintf("%s/%s", data.Identifier.GroupKind.Group, data.Identifier.GroupKind.Kind) }, }, namespaceColumn: { @@ -70,7 +69,7 @@ var ( width: 15, colorFunc: defaultColorFunc, contentFunc: func(data ResourceStatusData) string { - return data.Identifier.GetNamespace() + return data.Identifier.Namespace }, }, nameColumn: { @@ -78,7 +77,7 @@ var ( width: 20, colorFunc: defaultColorFunc, contentFunc: func(data ResourceStatusData) string { - return data.Identifier.GetName() + return data.Identifier.Name }, }, statusColumn: { @@ -255,8 +254,8 @@ var ( width: 20, requireResourceUpdateEvent: true, contentFunc: func(event wait.Event) string { - return fmt.Sprintf("%s/%s", event.EventResource.Identifier.GetAPIVersion(), - event.EventResource.Identifier.GetKind()) + return fmt.Sprintf("%s/%s", event.EventResource.ResourceIdentifier.GroupKind.Group, + event.EventResource.ResourceIdentifier.GroupKind.Kind) }, }, { @@ -264,7 +263,7 @@ var ( width: 15, requireResourceUpdateEvent: true, contentFunc: func(event wait.Event) string { - return event.EventResource.Identifier.GetNamespace() + return event.EventResource.ResourceIdentifier.Namespace }, }, { @@ -272,7 +271,7 @@ var ( width: 20, requireResourceUpdateEvent: true, contentFunc: func(event wait.Event) string { - return event.EventResource.Identifier.GetName() + return event.EventResource.ResourceIdentifier.Name }, }, { diff --git a/cmd/resource/status/cmd/util.go b/cmd/resource/status/cmd/util.go index be3f91523..bd745191f 100644 --- a/cmd/resource/status/cmd/util.go +++ b/cmd/resource/status/cmd/util.go @@ -4,7 +4,10 @@ package cmd import ( + "time" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -22,23 +25,23 @@ func init() { _ = clientgoscheme.AddToScheme(scheme) } -type createClientFunc func() (client.Reader, error) +type newResolverFunc func(pollInterval time.Duration) (*wait.Resolver, error) -// createClient returns a client for talking to a Kubernetes cluster. The client -// is from controller-runtime. -func createClient() (client.Reader, error) { +// newResolver returns a new resolver that can resolve status for resources based +// on polling the cluster. +func newResolver(pollInterval time.Duration) (*wait.Resolver, error) { config := ctrl.GetConfigOrDie() mapper, err := apiutil.NewDiscoveryRESTMapper(config) if err != nil { return nil, err } - return client.New(config, client.Options{Scheme: scheme, Mapper: mapper}) -} -func newClientFunc(c client.Reader) func() (client.Reader, error) { - return func() (client.Reader, error) { - return c, nil + c, err := client.New(config, client.Options{Scheme: scheme, Mapper: mapper}) + if err != nil { + return nil, err } + + return wait.NewResolver(c, mapper, pollInterval), nil } // CaptureIdentifiersFilter implements the Filter interface in the kio package. It @@ -55,8 +58,20 @@ func (f *CaptureIdentifiersFilter) Filter(slice []*yaml.RNode) ([]*yaml.RNode, e if err != nil { return nil, err } + // TODO(mortent): Update kyaml library id := meta.GetIdentifier() - f.Identifiers = append(f.Identifiers, &id) + gv, err := schema.ParseGroupVersion(id.APIVersion) + if err != nil { + return nil, err + } + f.Identifiers = append(f.Identifiers, wait.ResourceIdentifier{ + Name: id.Name, + Namespace: id.Namespace, + GroupKind: schema.GroupKind{ + Group: gv.Group, + Kind: id.Kind, + }, + }) } return slice, nil } diff --git a/cmd/resource/status/cmd/wait.go b/cmd/resource/status/cmd/wait.go index 0688b6436..ddd0e5425 100644 --- a/cmd/resource/status/cmd/wait.go +++ b/cmd/resource/status/cmd/wait.go @@ -19,7 +19,7 @@ import ( // GetWaitRunner return a command WaitRunner. func GetWaitRunner() *WaitRunner { r := &WaitRunner{ - createClientFunc: createClient, + newResolverFunc: newResolver, } c := &cobra.Command{ Use: "wait DIR...", @@ -51,7 +51,7 @@ type WaitRunner struct { Timeout time.Duration Command *cobra.Command - createClientFunc createClientFunc + newResolverFunc newResolverFunc } // runE implements the logic of the command and will call the Wait command in the wait @@ -59,12 +59,11 @@ type WaitRunner struct { // TablePrinter to display the information. func (r *WaitRunner) runE(c *cobra.Command, args []string) error { ctx := context.Background() - client, err := r.createClientFunc() - if err != nil { - return errors.Wrap(err, "error creating client") - } - resolver := wait.NewResolver(client, r.Interval) + resolver, err := r.newResolverFunc(r.Interval) + if err != nil { + return errors.Wrap(err, "errors creating resolver") + } captureFilter := &CaptureIdentifiersFilter{} filters := []kio.Filter{captureFilter} @@ -131,10 +130,9 @@ func (r *ResourceStatusCollector) updateResourceStatus(msg wait.Event) { r.AggregateStatus = msg.AggregateStatus eventResource := msg.EventResource for _, resourceState := range r.ResourceStatuses { - if resourceState.Identifier.GetAPIVersion() == eventResource.Identifier.GetAPIVersion() && - resourceState.Identifier.GetKind() == eventResource.Identifier.GetKind() && - resourceState.Identifier.GetNamespace() == eventResource.Identifier.GetNamespace() && - resourceState.Identifier.GetName() == eventResource.Identifier.GetName() { + if resourceState.Identifier.GroupKind == eventResource.ResourceIdentifier.GroupKind && + resourceState.Identifier.Namespace == eventResource.ResourceIdentifier.Namespace && + resourceState.Identifier.Name == eventResource.ResourceIdentifier.Name { resourceState.Status = eventResource.Status resourceState.Message = eventResource.Message } diff --git a/cmd/resource/status/cmd/wait_test.go b/cmd/resource/status/cmd/wait_test.go index e74100b51..2b77747f6 100644 --- a/cmd/resource/status/cmd/wait_test.go +++ b/cmd/resource/status/cmd/wait_test.go @@ -8,6 +8,8 @@ import ( "github.com/acarl005/stripansi" "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/kustomize/kstatus/status" ) @@ -18,7 +20,7 @@ func TestWaitNoResources(t *testing.T) { fakeClient := &FakeClient{} r := GetWaitRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) @@ -72,7 +74,7 @@ metadata: } r := GetWaitRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment")) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) @@ -144,7 +146,8 @@ items: } r := GetWaitRunner() - r.createClientFunc = newClientFunc(fakeClient) + r.newResolverFunc = fakeResolver(fakeClient, corev1.SchemeGroupVersion.WithKind("Pod"), + corev1.SchemeGroupVersion.WithKind("Service")) r.Command.SetArgs([]string{}) r.Command.SetIn(inBuffer) r.Command.SetOut(outBuffer) diff --git a/kstatus/wait/util.go b/kstatus/wait/util.go index 531df4b17..8000b4376 100644 --- a/kstatus/wait/util.go +++ b/kstatus/wait/util.go @@ -5,24 +5,28 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) -// keyFromResourceIdentifier creates a resourceKey from a ResourceIdentifier. -func keyFromResourceIdentifier(i ResourceIdentifier) resourceKey { - return resourceKey{ - apiVersion: i.GetAPIVersion(), - kind: i.GetKind(), - name: i.GetName(), - namespace: i.GetNamespace(), +func resourceIdentifierFromObject(object KubernetesObject) ResourceIdentifier { + return ResourceIdentifier{ + Name: object.GetName(), + Namespace: object.GetNamespace(), + GroupKind: object.GroupVersionKind().GroupKind(), } } -// keyFromObject creates a resourceKey from an Object. -func keyFromObject(obj runtime.Object) resourceKey { - gvk := obj.GetObjectKind().GroupVersionKind() - r := obj.(metav1.Object) - return resourceKey{ - apiVersion: gvk.GroupVersion().String(), - kind: gvk.Kind, - name: r.GetName(), - namespace: r.GetNamespace(), +func resourceIdentifiersFromObjects(objects []KubernetesObject) []ResourceIdentifier { + var resourceIdentifiers []ResourceIdentifier + for _, object := range objects { + resourceIdentifiers = append(resourceIdentifiers, resourceIdentifierFromObject(object)) + } + return resourceIdentifiers +} + +func resourceIdentifierFromRuntimeObject(object runtime.Object) ResourceIdentifier { + gvk := object.GetObjectKind().GroupVersionKind() + r := object.(metav1.Object) + return ResourceIdentifier{ + GroupKind: gvk.GroupKind(), + Name: r.GetName(), + Namespace: r.GetNamespace(), } } diff --git a/kstatus/wait/wait.go b/kstatus/wait/wait.go index af4585012..873c596e5 100644 --- a/kstatus/wait/wait.go +++ b/kstatus/wait/wait.go @@ -10,28 +10,59 @@ import ( "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/kustomize/kstatus/status" ) +const ( + defaultNamespace = "default" +) + // ResourceIdentifier defines the functions needed to identify // a resource in a cluster. This interface is implemented by // both unstructured.Unstructured and the standard Kubernetes types. -type ResourceIdentifier interface { +type KubernetesObject interface { GetName() string GetNamespace() string - GetAPIVersion() string - GetKind() string + GroupVersionKind() schema.GroupVersionKind +} + +// ResourceIdentifier contains the information needed to uniquely +// identify a resource in a cluster. +type ResourceIdentifier struct { + Name string + Namespace string + GroupKind schema.GroupKind +} + +// Equals compares two ResourceIdentifiers and returns true if they +// refer to the same resource. Special handling is needed for namespace +// since an empty namespace for a namespace-scoped resource is defaulted +// to the "default" namespace. +func (r ResourceIdentifier) Equals(other ResourceIdentifier) bool { + isSameNamespace := r.Namespace == other.Namespace || + (r.Namespace == "" && other.Namespace == defaultNamespace) || + (r.Namespace == defaultNamespace && other.Namespace == "") + return r.GroupKind == other.GroupKind && + r.Name == other.Name && + isSameNamespace } // Resolver provides the functions for resolving status of a list of resources. type Resolver struct { - // DynamicClient is the client used to talk - // with the cluster + // client is the client used to talk + // with the cluster. It uses the Reader interface + // from controller-runtime. client client.Reader + // mapper is the RESTMapper needed to look up mappings + // for resource types. + mapper meta.RESTMapper + // statusComputeFunc defines which function should be used for computing // the status of a resource. This is available for testing purposes. statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error) @@ -44,9 +75,10 @@ type Resolver struct { // NewResolver creates a new resolver with the provided client. Fetching // and polling of resources will be done using the provided client. -func NewResolver(client client.Reader, pollInterval time.Duration) *Resolver { +func NewResolver(client client.Reader, mapper meta.RESTMapper, pollInterval time.Duration) *Resolver { return &Resolver{ client: client, + mapper: mapper, statusComputeFunc: status.Compute, pollInterval: pollInterval, } @@ -58,24 +90,31 @@ func NewResolver(client client.Reader, pollInterval time.Duration) *Resolver { type ResourceResult struct { Result *status.Result - Resource ResourceIdentifier + ResourceIdentifier ResourceIdentifier Error error } -// FetchAndResolve returns the status for a list of resources. It will return -// the status for each of them individually. The slice of ResourceIdentifiers will -// only be used to get the information needed to fetch the updated state of -// the resources from the cluster. -func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIdentifier) []ResourceResult { +// FetchAndResolveObjects returns the status for a list of kubernetes objects. These can be provided +// either as Unstructured resources or the specific resource types. It will return the status for each +// of them individually. The provided resources will only be used to get the information needed to +// fetch the updated state of the resources from the cluster. +func (r *Resolver) FetchAndResolveObjects(ctx context.Context, objects []KubernetesObject) []ResourceResult { + resourceIds := resourceIdentifiersFromObjects(objects) + return r.FetchAndResolve(ctx, resourceIds) +} + +// FetchAndResolve returns the status for a list of ResourceIdentifiers. It will return +// the status for each of them individually. +func (r *Resolver) FetchAndResolve(ctx context.Context, resourceIDs []ResourceIdentifier) []ResourceResult { var results []ResourceResult - for _, resource := range resources { - u, err := r.fetchResource(ctx, resource) + for _, resourceID := range resourceIDs { + u, err := r.fetchResource(ctx, resourceID) if err != nil { if k8serrors.IsNotFound(errors.Cause(err)) { results = append(results, ResourceResult{ - Resource: resource, + ResourceIdentifier: resourceID, Result: &status.Result{ Status: status.CurrentStatus, Message: "Resource does not exist", @@ -87,17 +126,17 @@ func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIden Status: status.UnknownStatus, Message: fmt.Sprintf("Error fetching resource from cluster: %v", err), }, - Resource: resource, - Error: err, + ResourceIdentifier: resourceID, + Error: err, }) } continue } res, err := r.statusComputeFunc(u) results = append(results, ResourceResult{ - Result: res, - Resource: resource, - Error: err, + Result: res, + ResourceIdentifier: resourceID, + Error: err, }) } @@ -139,7 +178,7 @@ const ( type EventResource struct { // Identifier contains information that identifies which resource // this information is about. - Identifier ResourceIdentifier + ResourceIdentifier ResourceIdentifier // Status is the latest status for the given resource. Status status.Status @@ -153,9 +192,18 @@ type EventResource struct { Error error } -// WaitForStatus polls all the provided resources until all of them has -// reached the Current status. Updates the channel as resources change their status and -// when the wait is either completed or aborted. +// WaitForStatus polls all the provided resources until all of them have reached the Current +// status or the timeout specified through the context is reached. Updates on the status +// of individual resources and the aggregate status is provided through the Event channel. +func (r *Resolver) WaitForStatusOfObjects(ctx context.Context, objects []KubernetesObject) <-chan Event { + resourceIds := resourceIdentifiersFromObjects(objects) + return r.WaitForStatus(ctx, resourceIds) +} + +// WaitForStatus polls all the resources references by the provided ResourceIdentifiers until +// all of them have reached the Current status or the timeout specified through the context is +// reached. Updates on the status of individual resources and the aggregate status is provided +// through the Event channel. func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdentifier) <-chan Event { eventChan := make(chan Event) @@ -225,12 +273,11 @@ func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdenti // Completed type event. If the aggregate status has become Current, this function // will return true to signal that it is done. func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState, eventChan chan Event) bool { - for id := range waitState.ResourceWaitStates { + for resourceID := range waitState.ResourceWaitStates { // Make sure we have a local copy since we are passing // pointers to this variable as parameters to functions - identifier := id - u, err := r.fetchResource(ctx, &identifier) - eventResource, updateObserved := waitState.ResourceObserved(&identifier, u, err) + u, err := r.fetchResource(ctx, resourceID) + eventResource, updateObserved := waitState.ResourceObserved(resourceID, u, err) // Find the aggregate status based on the new state for this resource. aggStatus := waitState.AggregateStatus() // We want events for changes in status for each resource, so send @@ -259,15 +306,25 @@ func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState, // through the client available in the Resolver. It returns the resource // as an Unstructured. func (r *Resolver) fetchResource(ctx context.Context, identifier ResourceIdentifier) (*unstructured.Unstructured, error) { - key := types.NamespacedName{Name: identifier.GetName(), Namespace: identifier.GetNamespace()} - u := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": identifier.GetAPIVersion(), - "kind": identifier.GetKind(), - }, + // We need to look up the preferred version for the GroupKind and + // whether the resource type is cluster scoped. We look this + // up with the RESTMapper. + mapping, err := r.mapper.RESTMapping(identifier.GroupKind) + if err != nil { + return nil, err } - err := r.client.Get(ctx, key, u) - //return u, err + + // Resources might not have the namespace set, which means we need to set + // it to `default` if the resource is namespace scoped. + namespace := identifier.Namespace + if namespace == "" && mapping.Scope.Name() == meta.RESTScopeNameNamespace { + namespace = defaultNamespace + } + + key := types.NamespacedName{Name: identifier.Name, Namespace: namespace} + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(mapping.GroupVersionKind) + err = r.client.Get(ctx, key, u) if err != nil { return nil, errors.Wrap(err, "error fetching resource from cluster") } diff --git a/kstatus/wait/wait_test.go b/kstatus/wait/wait_test.go index 5e22bebd1..c11f8b845 100644 --- a/kstatus/wait/wait_test.go +++ b/kstatus/wait/wait_test.go @@ -10,9 +10,11 @@ import ( "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -32,6 +34,7 @@ func TestFetchAndResolve(t *testing.T) { testCases := map[string]struct { resources []runtime.Object + mapperGVKs []schema.GroupVersionKind expectedResults []result }{ "no resources": { @@ -52,6 +55,9 @@ func TestFetchAndResolve(t *testing.T) { }, }, }, + mapperGVKs: []schema.GroupVersionKind{ + appsv1.SchemeGroupVersion.WithKind("Deployment"), + }, expectedResults: []result{ { status: status.InProgressStatus, @@ -92,6 +98,10 @@ func TestFetchAndResolve(t *testing.T) { }, }, }, + mapperGVKs: []schema.GroupVersionKind{ + appsv1.SchemeGroupVersion.WithKind("StatefulSet"), + corev1.SchemeGroupVersion.WithKind("Secret"), + }, expectedResults: []result{ { status: status.CurrentStatus, @@ -109,18 +119,18 @@ func TestFetchAndResolve(t *testing.T) { tc := tc t.Run(tn, func(t *testing.T) { fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, tc.resources...) - resolver := NewResolver(fakeClient, testPollInterval) + + resolver := NewResolver(fakeClient, newRESTMapper(tc.mapperGVKs...), testPollInterval) resolver.statusComputeFunc = status.Compute var identifiers []ResourceIdentifier for _, resource := range tc.resources { gvk := resource.GetObjectKind().GroupVersionKind() r := resource.(metav1.Object) - identifiers = append(identifiers, &resourceKey{ - name: r.GetName(), - namespace: r.GetNamespace(), - apiVersion: gvk.GroupVersion().String(), - kind: gvk.Kind, + identifiers = append(identifiers, ResourceIdentifier{ + Name: r.GetName(), + Namespace: r.GetNamespace(), + GroupKind: gvk.GroupKind(), }) } @@ -128,7 +138,7 @@ func TestFetchAndResolve(t *testing.T) { for i, res := range results { id := identifiers[i] expectedRes := tc.expectedResults[i] - rid := fmt.Sprintf("%s/%s", id.GetNamespace(), id.GetName()) + rid := fmt.Sprintf("%s/%s", id.Namespace, id.Name) if expectedRes.error { if res.Error == nil { t.Errorf("expected error for resource %s, but didn't get one", rid) @@ -150,13 +160,15 @@ func TestFetchAndResolve(t *testing.T) { func TestFetchAndResolveUnknownResource(t *testing.T) { fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme) - resolver := NewResolver(fakeClient, testPollInterval) + resolver := NewResolver(fakeClient, newRESTMapper(appsv1.SchemeGroupVersion.WithKind("Deployment")), testPollInterval) results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{ - &resourceKey{ - apiVersion: "apps/v1", - kind: "Deploymnet", - name: "myDeployment", - namespace: "default", + { + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Name: "myDeployment", + Namespace: "default", }, }) @@ -181,14 +193,17 @@ func TestFetchAndResolveWithFetchError(t *testing.T) { &fakeReader{ Err: expectedError, }, + newRESTMapper(appsv1.SchemeGroupVersion.WithKind("Deployment")), testPollInterval, ) results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{ - &resourceKey{ - apiVersion: "apps/v1", - kind: "Deploymnet", - name: "myDeployment", - namespace: "default", + { + GroupKind: schema.GroupKind{ + Group: "apps", + Kind: "Deployment", + }, + Name: "myDeployment", + Namespace: "default", }, }) @@ -222,7 +237,7 @@ func TestFetchAndResolveComputeStatusError(t *testing.T) { } fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, resource) - resolver := NewResolver(fakeClient, testPollInterval) + resolver := NewResolver(fakeClient, newRESTMapper(appsv1.SchemeGroupVersion.WithKind("Deployment")), testPollInterval) resolver.statusComputeFunc = func(u *unstructured.Unstructured) (*status.Result, error) { return &status.Result{ @@ -231,11 +246,13 @@ func TestFetchAndResolveComputeStatusError(t *testing.T) { }, expectedError } results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{ - &resourceKey{ - apiVersion: resource.APIVersion, - kind: resource.Kind, - name: resource.GetName(), - namespace: resource.GetNamespace(), + { + GroupKind: schema.GroupKind{ + Group: resource.GroupVersionKind().Group, + Kind: resource.Kind, + }, + Name: resource.GetName(), + Namespace: resource.GetNamespace(), }, }) @@ -358,23 +375,28 @@ func TestWaitForStatus(t *testing.T) { tc := tc t.Run(tn, func(t *testing.T) { var objs []runtime.Object - statusResults := make(map[resourceKey][]*status.Result) + statusResults := make(map[ResourceIdentifier][]*status.Result) var identifiers []ResourceIdentifier for obj, statuses := range tc.resources { objs = append(objs, obj) - identifier := keyFromObject(obj) - identifiers = append(identifiers, &identifier) + identifier := resourceIdentifierFromRuntimeObject(obj) + identifiers = append(identifiers, identifier) statusResults[identifier] = statuses } statusComputer := statusComputer{ results: statusResults, - resourceCallCount: make(map[resourceKey]int), + resourceCallCount: make(map[ResourceIdentifier]int), } resolver := &Resolver{ - client: fake.NewFakeClientWithScheme(scheme.Scheme, objs...), + client: fake.NewFakeClientWithScheme(scheme.Scheme, objs...), + mapper: newRESTMapper( + appsv1.SchemeGroupVersion.WithKind("Deployment"), + appsv1.SchemeGroupVersion.WithKind("StatefulSet"), + corev1.SchemeGroupVersion.WithKind("Service"), + ), statusComputeFunc: statusComputer.Compute, pollInterval: testPollInterval, } @@ -397,20 +419,20 @@ func TestWaitForStatus(t *testing.T) { } var aggregateStatuses []status.Status - resourceStatuses := make(map[resourceKey][]status.Status) + resourceStatuses := make(map[ResourceIdentifier][]status.Status) for _, e := range events { aggregateStatuses = append(aggregateStatuses, e.AggregateStatus) if e.EventResource != nil { - identifier := keyFromResourceIdentifier(e.EventResource.Identifier) + identifier := e.EventResource.ResourceIdentifier resourceStatuses[identifier] = append(resourceStatuses[identifier], e.EventResource.Status) } } for resource, expectedStatuses := range tc.expectedResourceStatuses { - identifier := keyFromObject(resource) + identifier := resourceIdentifierFromRuntimeObject(resource) actualStatuses := resourceStatuses[identifier] if !reflect.DeepEqual(expectedStatuses, actualStatuses) { - t.Errorf("expected statuses %v for resource %s/%s, but got %v", expectedStatuses, identifier.namespace, identifier.name, actualStatuses) + t.Errorf("expected statuses %v for resource %s/%s, but got %v", expectedStatuses, identifier.Namespace, identifier.Name, actualStatuses) } } @@ -423,21 +445,25 @@ func TestWaitForStatus(t *testing.T) { func TestWaitForStatusDeletedResources(t *testing.T) { statusComputer := statusComputer{ - results: make(map[resourceKey][]*status.Result), - resourceCallCount: make(map[resourceKey]int), + results: make(map[ResourceIdentifier][]*status.Result), + resourceCallCount: make(map[ResourceIdentifier]int), } resolver := &Resolver{ - client: fake.NewFakeClientWithScheme(scheme.Scheme), + client: fake.NewFakeClientWithScheme(scheme.Scheme), + mapper: newRESTMapper( + appsv1.SchemeGroupVersion.WithKind("Deployment"), + corev1.SchemeGroupVersion.WithKind("Service"), + ), statusComputeFunc: statusComputer.Compute, pollInterval: testPollInterval, } - depResourceIdentifier := keyFromObject(deploymentResource) - serviceResourceIdentifier := keyFromObject(serviceResource) + depResourceIdentifier := resourceIdentifierFromRuntimeObject(deploymentResource) + serviceResourceIdentifier := resourceIdentifierFromRuntimeObject(serviceResource) identifiers := []ResourceIdentifier{ - &depResourceIdentifier, - &serviceResourceIdentifier, + depResourceIdentifier, + serviceResourceIdentifier, } eventChan := resolver.WaitForStatus(context.TODO(), identifiers) @@ -499,17 +525,12 @@ loop: type statusComputer struct { t *testing.T - results map[resourceKey][]*status.Result - resourceCallCount map[resourceKey]int + results map[ResourceIdentifier][]*status.Result + resourceCallCount map[ResourceIdentifier]int } func (s *statusComputer) Compute(u *unstructured.Unstructured) (*status.Result, error) { - identifier := resourceKey{ - apiVersion: u.GetAPIVersion(), - kind: u.GetKind(), - name: u.GetName(), - namespace: u.GetNamespace(), - } + identifier := resourceIdentifierFromRuntimeObject(u) resourceResults, ok := s.results[identifier] if !ok { @@ -559,3 +580,15 @@ var serviceResource = &corev1.Service{ Namespace: "default", }, } + +func newRESTMapper(gvks ...schema.GroupVersionKind) meta.RESTMapper { + var groupVersions []schema.GroupVersion + for _, gvk := range gvks { + groupVersions = append(groupVersions, gvk.GroupVersion()) + } + mapper := meta.NewDefaultRESTMapper(groupVersions) + for _, gvk := range gvks { + mapper.Add(gvk, meta.RESTScopeNamespace) + } + return mapper +} diff --git a/kstatus/wait/waitstate.go b/kstatus/wait/waitstate.go index a865938a6..ac1315c32 100644 --- a/kstatus/wait/waitstate.go +++ b/kstatus/wait/waitstate.go @@ -10,41 +10,12 @@ import ( "sigs.k8s.io/kustomize/kstatus/status" ) -// resourceKey is a minimal implementation of -// the ResourceIdentifier interface. -type resourceKey struct { - name string - namespace string - apiVersion string - kind string -} - -// GetName returns the name of the resource. -func (r *resourceKey) GetName() string { - return r.name -} - -// GetNamespace returns the namespace of the resource. -func (r *resourceKey) GetNamespace() string { - return r.namespace -} - -// GetAPIVersion returns the API version of the resource. -func (r *resourceKey) GetAPIVersion() string { - return r.apiVersion -} - -// GetKind returns the Kind of the resource. -func (r *resourceKey) GetKind() string { - return r.kind -} - // waitState keeps the state about the resources and their last // observed state. This is used to determine any changes in state // so events can be sent when needed. type waitState struct { // ResourceWaitStates contains wait state for each of the resources. - ResourceWaitStates map[resourceKey]*resourceWaitState + ResourceWaitStates map[ResourceIdentifier]*resourceWaitState // statusComputeFunc defines the function used to compute the state of // a single resource. This is available for testing purposes. @@ -62,17 +33,11 @@ type resourceWaitState struct { // newWaitState creates a new waitState object and initializes it with the // provided slice of resources and the provided statusComputeFunc. -func newWaitState(resources []ResourceIdentifier, statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)) *waitState { - resourceWaitStates := make(map[resourceKey]*resourceWaitState) +func newWaitState(resourceIDs []ResourceIdentifier, statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)) *waitState { + resourceWaitStates := make(map[ResourceIdentifier]*resourceWaitState) - for _, r := range resources { - identifier := resourceKey{ - apiVersion: r.GetAPIVersion(), - kind: r.GetKind(), - name: r.GetName(), - namespace: r.GetNamespace(), - } - resourceWaitStates[identifier] = &resourceWaitState{} + for _, resourceID := range resourceIDs { + resourceWaitStates[resourceID] = &resourceWaitState{} } return &waitState{ @@ -107,19 +72,12 @@ func (w *waitState) AggregateStatus() status.Status { // that will be true if the status of the observed resource has changed // since the previous observation and false it not. This is used to determine // whether a new event should be sent based on this observation. -func (w *waitState) ResourceObserved(id ResourceIdentifier, resource *unstructured.Unstructured, err error) (EventResource, bool) { - identifier := resourceKey{ - name: id.GetName(), - namespace: id.GetNamespace(), - apiVersion: id.GetAPIVersion(), - kind: id.GetKind(), - } - +func (w *waitState) ResourceObserved(resourceID ResourceIdentifier, resource *unstructured.Unstructured, err error) (EventResource, bool) { // Check for nil is not needed here as the id passed in comes // from iterating over the keys of the map. - rws := w.ResourceWaitStates[identifier] + rws := w.ResourceWaitStates[resourceID] - eventResource := w.getEventResource(identifier, resource, err) + eventResource := w.getEventResource(resourceID, resource, err) // If the new eventResource is identical to the previous one, we return // with the last return value indicating this is not a new event. if rws.LastEvent != nil && reflect.DeepEqual(eventResource, *rws.LastEvent) { @@ -133,22 +91,22 @@ func (w *waitState) ResourceObserved(id ResourceIdentifier, resource *unstructur // the provided resourceKey. The EventResource contains information about the // latest status for the given resource, so it computes status for the resource // as well as check for deletion. -func (w *waitState) getEventResource(identifier resourceKey, resource *unstructured.Unstructured, err error) EventResource { +func (w *waitState) getEventResource(resourceID ResourceIdentifier, resource *unstructured.Unstructured, err error) EventResource { // Get the resourceWaitState for this resource. It contains information // of the previous observed statuses. We don't need to check for nil here // as the identifier comes from iterating over the keys of the // ResourceWaitState map. - r := w.ResourceWaitStates[identifier] + r := w.ResourceWaitStates[resourceID] // If fetching the resource from the cluster failed, we don't really // know anything about the status of the resource, so simply // report the status as Unknown. if err != nil && !k8serrors.IsNotFound(errors.Cause(err)) { return EventResource{ - Identifier: &identifier, - Status: status.UnknownStatus, - Message: fmt.Sprintf("Error: %s", err), - Error: err, + ResourceIdentifier: resourceID, + Status: status.UnknownStatus, + Message: fmt.Sprintf("Error: %s", err), + Error: err, } } @@ -161,9 +119,9 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu if k8serrors.IsNotFound(errors.Cause(err)) { r.HasBeenCurrent = true return EventResource{ - Identifier: &identifier, - Status: status.CurrentStatus, - Message: fmt.Sprintf("Resource has been deleted"), + ResourceIdentifier: resourceID, + Status: status.CurrentStatus, + Message: fmt.Sprintf("Resource has been deleted"), } } @@ -177,9 +135,9 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu if resource.GetDeletionTimestamp() != nil { return EventResource{ - Identifier: &identifier, - Status: status.TerminatingStatus, - Message: fmt.Sprintf("Resource is terminating"), + ResourceIdentifier: resourceID, + Status: status.TerminatingStatus, + Message: fmt.Sprintf("Resource is terminating"), } } @@ -188,10 +146,10 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu // as Unknown. if err != nil { return EventResource{ - Identifier: &identifier, - Status: status.UnknownStatus, - Message: fmt.Sprintf("Error: %s", err), - Error: err, + ResourceIdentifier: resourceID, + Status: status.UnknownStatus, + Message: fmt.Sprintf("Error: %s", err), + Error: err, } } @@ -204,8 +162,8 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu } return EventResource{ - Identifier: &identifier, - Status: statusResult.Status, - Message: statusResult.Message, + ResourceIdentifier: resourceID, + Status: statusResult.Status, + Message: statusResult.Message, } }