Merge pull request #2081 from mortent/FixDefaultNamespaceIssue

Change the ResourceIdentifier used in kstatus to use only Group instead of GroupVersion
This commit is contained in:
Kubernetes Prow Robot
2020-01-14 20:53:31 -08:00
committed by GitHub
14 changed files with 331 additions and 240 deletions

View File

@@ -10,8 +10,8 @@ import (
"time" "time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/cmd/util"
@@ -47,7 +47,12 @@ func (s *StatusOptions) AddFlags(c *cobra.Command) {
} }
func (s *StatusOptions) waitForStatus(infos []*resource.Info) error { func (s *StatusOptions) waitForStatus(infos []*resource.Info) error {
c, err := getClient(s.factory) mapper, err := getRESTMapper(s.factory)
if err != nil {
return err
}
c, err := getClient(s.factory, mapper)
if err != nil { if err != nil {
return err return err
} }
@@ -55,16 +60,15 @@ func (s *StatusOptions) waitForStatus(infos []*resource.Info) error {
ctx, cancel := context.WithTimeout(context.Background(), s.timeout) ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel() defer cancel()
resolver := wait.NewResolver(c, s.period) resolver := wait.NewResolver(c, mapper, s.period)
ch := resolver.WaitForStatus(ctx, infosToResourceIdentifiers(infos)) ch := resolver.WaitForStatus(ctx, infosToResourceIdentifiers(infos))
for msg := range ch { for msg := range ch {
switch msg.Type { switch msg.Type {
case wait.ResourceUpdate: case wait.ResourceUpdate:
id := msg.EventResource.Identifier id := msg.EventResource.ResourceIdentifier
gv, _ := schema.ParseGroupVersion(id.GetAPIVersion()) gk := id.GroupKind
gvk := gv.WithKind(id.GetKind()) fmt.Fprintf(s.ioStreams.Out, "%s/%s is %s: %s\n", strings.ToLower(gk.String()), id.Name, msg.EventResource.Status.String(), msg.EventResource.Message)
fmt.Fprintf(s.ioStreams.Out, "%s/%s is %s: %s\n", strings.ToLower(gvk.GroupKind().String()), id.GetName(), msg.EventResource.Status.String(), msg.EventResource.Message)
case wait.Completed: case wait.Completed:
fmt.Fprint(s.ioStreams.Out, "all resources has reached the Current status\n") fmt.Fprint(s.ioStreams.Out, "all resources has reached the Current status\n")
case wait.Aborted: case wait.Aborted:
@@ -78,18 +82,21 @@ func infosToResourceIdentifiers(infos []*resource.Info) []wait.ResourceIdentifie
var resources []wait.ResourceIdentifier var resources []wait.ResourceIdentifier
for _, info := range infos { for _, info := range infos {
u := info.Object.(*unstructured.Unstructured) u := info.Object.(*unstructured.Unstructured)
resources = append(resources, u) resources = append(resources, wait.ResourceIdentifier{
GroupKind: u.GroupVersionKind().GroupKind(),
Namespace: u.GetNamespace(),
Name: u.GetName(),
})
} }
return resources return resources
} }
func getClient(f util.Factory) (client.Reader, error) { func getRESTMapper(f util.Factory) (meta.RESTMapper, error) {
config, err := f.ToRESTConfig() return f.ToRESTMapper()
if err != nil {
return nil, err
} }
mapper, err := f.ToRESTMapper() func getClient(f util.Factory, mapper meta.RESTMapper) (client.Reader, error) {
config, err := f.ToRESTConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -10,14 +10,13 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"sigs.k8s.io/kustomize/cmd/resource/status/generateddocs/commands" "sigs.k8s.io/kustomize/cmd/resource/status/generateddocs/commands"
"sigs.k8s.io/kustomize/kstatus/wait"
"sigs.k8s.io/kustomize/kyaml/kio" "sigs.k8s.io/kustomize/kyaml/kio"
) )
// GetEventsRunner returns a command EventsRunner. // GetEventsRunner returns a command EventsRunner.
func GetEventsRunner() *EventsRunner { func GetEventsRunner() *EventsRunner {
r := &EventsRunner{ r := &EventsRunner{
createClientFunc: createClient, newResolverFunc: newResolver,
} }
c := &cobra.Command{ c := &cobra.Command{
Use: "events DIR...", Use: "events DIR...",
@@ -49,18 +48,16 @@ type EventsRunner struct {
Timeout time.Duration Timeout time.Duration
Command *cobra.Command Command *cobra.Command
createClientFunc createClientFunc newResolverFunc newResolverFunc
} }
func (r *EventsRunner) runE(c *cobra.Command, args []string) error { func (r *EventsRunner) runE(c *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
// Create a client and use it to set up a new resolver. resolver, err := r.newResolverFunc(r.Interval)
client, err := r.createClientFunc()
if err != nil { if err != nil {
return errors.Wrap(err, "error creating client") return errors.Wrap(err, "error creating resolver")
} }
resolver := wait.NewResolver(client, r.Interval)
// Set up a CaptureIdentifierFilter and run all inputs through the // Set up a CaptureIdentifierFilter and run all inputs through the
// filter with the pipeline to capture the inventory of resources // filter with the pipeline to capture the inventory of resources

View File

@@ -9,6 +9,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/kustomize/kstatus/status" "sigs.k8s.io/kustomize/kstatus/status"
"sigs.k8s.io/kustomize/kstatus/wait" "sigs.k8s.io/kustomize/kstatus/wait"
) )
@@ -20,7 +22,7 @@ func TestEventsNoResources(t *testing.T) {
fakeClient := &FakeClient{} fakeClient := &FakeClient{}
r := GetEventsRunner() r := GetEventsRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient)
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -64,7 +66,7 @@ metadata:
} }
r := GetEventsRunner() r := GetEventsRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment"))
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -141,7 +143,8 @@ items:
} }
r := GetEventsRunner() r := GetEventsRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient, corev1.SchemeGroupVersion.WithKind("Pod"),
corev1.SchemeGroupVersion.WithKind("Service"))
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)

View File

@@ -18,7 +18,7 @@ import (
// GetFetchRunner returns a command FetchRunner. // GetFetchRunner returns a command FetchRunner.
func GetFetchRunner() *FetchRunner { func GetFetchRunner() *FetchRunner {
r := &FetchRunner{ r := &FetchRunner{
createClientFunc: createClient, newResolverFunc: newResolver,
} }
c := &cobra.Command{ c := &cobra.Command{
Use: "fetch DIR...", Use: "fetch DIR...",
@@ -44,20 +44,17 @@ type FetchRunner struct {
IncludeSubpackages bool IncludeSubpackages bool
Command *cobra.Command Command *cobra.Command
createClientFunc createClientFunc newResolverFunc newResolverFunc
} }
func (r *FetchRunner) runE(c *cobra.Command, args []string) error { func (r *FetchRunner) runE(c *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
// Create a new client and use it to set up a resolver. resolver, err := r.newResolverFunc(time.Minute)
k8sClient, err := r.createClientFunc()
if err != nil { if err != nil {
return errors.Wrap(err, "error creating k8sClient") return errors.Wrap(err, "error creating resolver")
} }
resolver := wait.NewResolver(k8sClient, time.Minute)
// Set up a CaptureIdentifierFilter and run all inputs through the // Set up a CaptureIdentifierFilter and run all inputs through the
// filter with the pipeline to capture the inventory of resources // filter with the pipeline to capture the inventory of resources
// which we are interested in. // which we are interested in.
@@ -108,7 +105,7 @@ func (f FetchStatusInfo) CurrentStatus() StatusData {
var resourceData []ResourceStatusData var resourceData []ResourceStatusData
for _, res := range f.Results { for _, res := range f.Results {
rsd := ResourceStatusData{ rsd := ResourceStatusData{
Identifier: res.Resource, Identifier: res.ResourceIdentifier,
} }
if res.Error != nil { if res.Error != nil {
rsd.Status = status.UnknownStatus rsd.Status = status.UnknownStatus

View File

@@ -25,7 +25,7 @@ func TestEmptyManifest(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(scheme) fakeClient := fake.NewFakeClientWithScheme(scheme)
r := GetFetchRunner() r := GetFetchRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient)
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -65,7 +65,7 @@ metadata:
fakeClient := fake.NewFakeClientWithScheme(scheme, deployment) fakeClient := fake.NewFakeClientWithScheme(scheme, deployment)
r := GetFetchRunner() r := GetFetchRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment"))
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -78,7 +78,7 @@ metadata:
tableOutput := parseTableOutput(t, cleanOutput) tableOutput := parseTableOutput(t, cleanOutput)
expectedResource := ResourceIdentifier{ expectedResource := ResourceIdentifier{
apiVersion: "apps/v1", apiVersion: "apps",
kind: "Deployment", kind: "Deployment",
namespace: "default", namespace: "default",
name: "bar", name: "bar",
@@ -139,7 +139,8 @@ metadata:
outBuffer := &bytes.Buffer{} outBuffer := &bytes.Buffer{}
r := GetFetchRunner() r := GetFetchRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment"),
v1.SchemeGroupVersion.WithKind("Service"))
r.Command.SetArgs([]string{d}) r.Command.SetArgs([]string{d})
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -152,7 +153,7 @@ metadata:
tableOutput := parseTableOutput(t, cleanOutput) tableOutput := parseTableOutput(t, cleanOutput)
expectedDeploymentResource := ResourceIdentifier{ expectedDeploymentResource := ResourceIdentifier{
apiVersion: "apps/v1", apiVersion: "apps",
kind: "Deployment", kind: "Deployment",
namespace: "default", namespace: "default",
name: "foo", name: "foo",
@@ -162,7 +163,7 @@ metadata:
verifyOutputContains(t, tableOutput, expectedDeploymentResource, expectedDeploymentStatus, expectedDeploymentMessage) verifyOutputContains(t, tableOutput, expectedDeploymentResource, expectedDeploymentStatus, expectedDeploymentMessage)
expectedServiceResource := ResourceIdentifier{ expectedServiceResource := ResourceIdentifier{
apiVersion: "v1", apiVersion: "",
kind: "Service", kind: "Service",
namespace: "default", namespace: "default",
name: "foo", name: "foo",

View File

@@ -6,11 +6,15 @@ import (
"regexp" "regexp"
"strings" "strings"
"testing" "testing"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kstatus/status" "sigs.k8s.io/kustomize/kstatus/status"
"sigs.k8s.io/kustomize/kstatus/wait"
) )
type TableOutput struct { type TableOutput struct {
@@ -232,10 +236,25 @@ func (f *FakeClient) Get(_ context.Context, _ client.ObjectKey, obj runtime.Obje
return callbackFunc(u) return callbackFunc(u)
} }
func (f *FakeClient) List(ctx context.Context, list runtime.Object, opts ...client.ListOption) error { func (f *FakeClient) List(context.Context, runtime.Object, ...client.ListOption) error {
return nil return nil
} }
func fakeResolver(fakeClient client.Reader, mapperTypes ...schema.GroupVersionKind) newResolverFunc {
return func(pollInterval time.Duration) (*wait.Resolver, error) {
var groupVersions []schema.GroupVersion
for _, gvk := range mapperTypes {
groupVersions = append(groupVersions, gvk.GroupVersion())
}
mapper := meta.NewDefaultRESTMapper(groupVersions)
for _, gvk := range mapperTypes {
mapper.Add(gvk, meta.RESTScopeNamespace)
}
return wait.NewResolver(fakeClient, mapper, pollInterval), nil
}
}
func joinStatuses(statuses []status.Status) string { func joinStatuses(statuses []status.Status) string {
var stringStatuses []string var stringStatuses []string
for _, s := range statuses { for _, s := range statuses {

View File

@@ -61,8 +61,7 @@ var (
width: 25, width: 25,
colorFunc: defaultColorFunc, colorFunc: defaultColorFunc,
contentFunc: func(data ResourceStatusData) string { contentFunc: func(data ResourceStatusData) string {
return fmt.Sprintf("%s/%s", data.Identifier.GetAPIVersion(), return fmt.Sprintf("%s/%s", data.Identifier.GroupKind.Group, data.Identifier.GroupKind.Kind)
data.Identifier.GetKind())
}, },
}, },
namespaceColumn: { namespaceColumn: {
@@ -70,7 +69,7 @@ var (
width: 15, width: 15,
colorFunc: defaultColorFunc, colorFunc: defaultColorFunc,
contentFunc: func(data ResourceStatusData) string { contentFunc: func(data ResourceStatusData) string {
return data.Identifier.GetNamespace() return data.Identifier.Namespace
}, },
}, },
nameColumn: { nameColumn: {
@@ -78,7 +77,7 @@ var (
width: 20, width: 20,
colorFunc: defaultColorFunc, colorFunc: defaultColorFunc,
contentFunc: func(data ResourceStatusData) string { contentFunc: func(data ResourceStatusData) string {
return data.Identifier.GetName() return data.Identifier.Name
}, },
}, },
statusColumn: { statusColumn: {
@@ -255,8 +254,8 @@ var (
width: 20, width: 20,
requireResourceUpdateEvent: true, requireResourceUpdateEvent: true,
contentFunc: func(event wait.Event) string { contentFunc: func(event wait.Event) string {
return fmt.Sprintf("%s/%s", event.EventResource.Identifier.GetAPIVersion(), return fmt.Sprintf("%s/%s", event.EventResource.ResourceIdentifier.GroupKind.Group,
event.EventResource.Identifier.GetKind()) event.EventResource.ResourceIdentifier.GroupKind.Kind)
}, },
}, },
{ {
@@ -264,7 +263,7 @@ var (
width: 15, width: 15,
requireResourceUpdateEvent: true, requireResourceUpdateEvent: true,
contentFunc: func(event wait.Event) string { contentFunc: func(event wait.Event) string {
return event.EventResource.Identifier.GetNamespace() return event.EventResource.ResourceIdentifier.Namespace
}, },
}, },
{ {
@@ -272,7 +271,7 @@ var (
width: 20, width: 20,
requireResourceUpdateEvent: true, requireResourceUpdateEvent: true,
contentFunc: func(event wait.Event) string { contentFunc: func(event wait.Event) string {
return event.EventResource.Identifier.GetName() return event.EventResource.ResourceIdentifier.Name
}, },
}, },
{ {

View File

@@ -4,7 +4,10 @@
package cmd package cmd
import ( import (
"time"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientgoscheme "k8s.io/client-go/kubernetes/scheme" clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
@@ -22,23 +25,23 @@ func init() {
_ = clientgoscheme.AddToScheme(scheme) _ = clientgoscheme.AddToScheme(scheme)
} }
type createClientFunc func() (client.Reader, error) type newResolverFunc func(pollInterval time.Duration) (*wait.Resolver, error)
// createClient returns a client for talking to a Kubernetes cluster. The client // newResolver returns a new resolver that can resolve status for resources based
// is from controller-runtime. // on polling the cluster.
func createClient() (client.Reader, error) { func newResolver(pollInterval time.Duration) (*wait.Resolver, error) {
config := ctrl.GetConfigOrDie() config := ctrl.GetConfigOrDie()
mapper, err := apiutil.NewDiscoveryRESTMapper(config) mapper, err := apiutil.NewDiscoveryRESTMapper(config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client.New(config, client.Options{Scheme: scheme, Mapper: mapper})
c, err := client.New(config, client.Options{Scheme: scheme, Mapper: mapper})
if err != nil {
return nil, err
} }
func newClientFunc(c client.Reader) func() (client.Reader, error) { return wait.NewResolver(c, mapper, pollInterval), nil
return func() (client.Reader, error) {
return c, nil
}
} }
// CaptureIdentifiersFilter implements the Filter interface in the kio package. It // CaptureIdentifiersFilter implements the Filter interface in the kio package. It
@@ -55,8 +58,20 @@ func (f *CaptureIdentifiersFilter) Filter(slice []*yaml.RNode) ([]*yaml.RNode, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO(mortent): Update kyaml library
id := meta.GetIdentifier() id := meta.GetIdentifier()
f.Identifiers = append(f.Identifiers, &id) gv, err := schema.ParseGroupVersion(id.APIVersion)
if err != nil {
return nil, err
}
f.Identifiers = append(f.Identifiers, wait.ResourceIdentifier{
Name: id.Name,
Namespace: id.Namespace,
GroupKind: schema.GroupKind{
Group: gv.Group,
Kind: id.Kind,
},
})
} }
return slice, nil return slice, nil
} }

View File

@@ -19,7 +19,7 @@ import (
// GetWaitRunner return a command WaitRunner. // GetWaitRunner return a command WaitRunner.
func GetWaitRunner() *WaitRunner { func GetWaitRunner() *WaitRunner {
r := &WaitRunner{ r := &WaitRunner{
createClientFunc: createClient, newResolverFunc: newResolver,
} }
c := &cobra.Command{ c := &cobra.Command{
Use: "wait DIR...", Use: "wait DIR...",
@@ -51,7 +51,7 @@ type WaitRunner struct {
Timeout time.Duration Timeout time.Duration
Command *cobra.Command Command *cobra.Command
createClientFunc createClientFunc newResolverFunc newResolverFunc
} }
// runE implements the logic of the command and will call the Wait command in the wait // runE implements the logic of the command and will call the Wait command in the wait
@@ -59,12 +59,11 @@ type WaitRunner struct {
// TablePrinter to display the information. // TablePrinter to display the information.
func (r *WaitRunner) runE(c *cobra.Command, args []string) error { func (r *WaitRunner) runE(c *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
client, err := r.createClientFunc()
if err != nil {
return errors.Wrap(err, "error creating client")
}
resolver := wait.NewResolver(client, r.Interval) resolver, err := r.newResolverFunc(r.Interval)
if err != nil {
return errors.Wrap(err, "errors creating resolver")
}
captureFilter := &CaptureIdentifiersFilter{} captureFilter := &CaptureIdentifiersFilter{}
filters := []kio.Filter{captureFilter} filters := []kio.Filter{captureFilter}
@@ -131,10 +130,9 @@ func (r *ResourceStatusCollector) updateResourceStatus(msg wait.Event) {
r.AggregateStatus = msg.AggregateStatus r.AggregateStatus = msg.AggregateStatus
eventResource := msg.EventResource eventResource := msg.EventResource
for _, resourceState := range r.ResourceStatuses { for _, resourceState := range r.ResourceStatuses {
if resourceState.Identifier.GetAPIVersion() == eventResource.Identifier.GetAPIVersion() && if resourceState.Identifier.GroupKind == eventResource.ResourceIdentifier.GroupKind &&
resourceState.Identifier.GetKind() == eventResource.Identifier.GetKind() && resourceState.Identifier.Namespace == eventResource.ResourceIdentifier.Namespace &&
resourceState.Identifier.GetNamespace() == eventResource.Identifier.GetNamespace() && resourceState.Identifier.Name == eventResource.ResourceIdentifier.Name {
resourceState.Identifier.GetName() == eventResource.Identifier.GetName() {
resourceState.Status = eventResource.Status resourceState.Status = eventResource.Status
resourceState.Message = eventResource.Message resourceState.Message = eventResource.Message
} }

View File

@@ -8,6 +8,8 @@ import (
"github.com/acarl005/stripansi" "github.com/acarl005/stripansi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/kustomize/kstatus/status" "sigs.k8s.io/kustomize/kstatus/status"
) )
@@ -18,7 +20,7 @@ func TestWaitNoResources(t *testing.T) {
fakeClient := &FakeClient{} fakeClient := &FakeClient{}
r := GetWaitRunner() r := GetWaitRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient)
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -72,7 +74,7 @@ metadata:
} }
r := GetWaitRunner() r := GetWaitRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient, appsv1.SchemeGroupVersion.WithKind("Deployment"))
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)
@@ -144,7 +146,8 @@ items:
} }
r := GetWaitRunner() r := GetWaitRunner()
r.createClientFunc = newClientFunc(fakeClient) r.newResolverFunc = fakeResolver(fakeClient, corev1.SchemeGroupVersion.WithKind("Pod"),
corev1.SchemeGroupVersion.WithKind("Service"))
r.Command.SetArgs([]string{}) r.Command.SetArgs([]string{})
r.Command.SetIn(inBuffer) r.Command.SetIn(inBuffer)
r.Command.SetOut(outBuffer) r.Command.SetOut(outBuffer)

View File

@@ -5,24 +5,28 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
) )
// keyFromResourceIdentifier creates a resourceKey from a ResourceIdentifier. func resourceIdentifierFromObject(object KubernetesObject) ResourceIdentifier {
func keyFromResourceIdentifier(i ResourceIdentifier) resourceKey { return ResourceIdentifier{
return resourceKey{ Name: object.GetName(),
apiVersion: i.GetAPIVersion(), Namespace: object.GetNamespace(),
kind: i.GetKind(), GroupKind: object.GroupVersionKind().GroupKind(),
name: i.GetName(),
namespace: i.GetNamespace(),
} }
} }
// keyFromObject creates a resourceKey from an Object. func resourceIdentifiersFromObjects(objects []KubernetesObject) []ResourceIdentifier {
func keyFromObject(obj runtime.Object) resourceKey { var resourceIdentifiers []ResourceIdentifier
gvk := obj.GetObjectKind().GroupVersionKind() for _, object := range objects {
r := obj.(metav1.Object) resourceIdentifiers = append(resourceIdentifiers, resourceIdentifierFromObject(object))
return resourceKey{ }
apiVersion: gvk.GroupVersion().String(), return resourceIdentifiers
kind: gvk.Kind, }
name: r.GetName(),
namespace: r.GetNamespace(), func resourceIdentifierFromRuntimeObject(object runtime.Object) ResourceIdentifier {
gvk := object.GetObjectKind().GroupVersionKind()
r := object.(metav1.Object)
return ResourceIdentifier{
GroupKind: gvk.GroupKind(),
Name: r.GetName(),
Namespace: r.GetNamespace(),
} }
} }

View File

@@ -10,28 +10,59 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kstatus/status" "sigs.k8s.io/kustomize/kstatus/status"
) )
const (
defaultNamespace = "default"
)
// ResourceIdentifier defines the functions needed to identify // ResourceIdentifier defines the functions needed to identify
// a resource in a cluster. This interface is implemented by // a resource in a cluster. This interface is implemented by
// both unstructured.Unstructured and the standard Kubernetes types. // both unstructured.Unstructured and the standard Kubernetes types.
type ResourceIdentifier interface { type KubernetesObject interface {
GetName() string GetName() string
GetNamespace() string GetNamespace() string
GetAPIVersion() string GroupVersionKind() schema.GroupVersionKind
GetKind() string }
// ResourceIdentifier contains the information needed to uniquely
// identify a resource in a cluster.
type ResourceIdentifier struct {
Name string
Namespace string
GroupKind schema.GroupKind
}
// Equals compares two ResourceIdentifiers and returns true if they
// refer to the same resource. Special handling is needed for namespace
// since an empty namespace for a namespace-scoped resource is defaulted
// to the "default" namespace.
func (r ResourceIdentifier) Equals(other ResourceIdentifier) bool {
isSameNamespace := r.Namespace == other.Namespace ||
(r.Namespace == "" && other.Namespace == defaultNamespace) ||
(r.Namespace == defaultNamespace && other.Namespace == "")
return r.GroupKind == other.GroupKind &&
r.Name == other.Name &&
isSameNamespace
} }
// Resolver provides the functions for resolving status of a list of resources. // Resolver provides the functions for resolving status of a list of resources.
type Resolver struct { type Resolver struct {
// DynamicClient is the client used to talk // client is the client used to talk
// with the cluster // with the cluster. It uses the Reader interface
// from controller-runtime.
client client.Reader client client.Reader
// mapper is the RESTMapper needed to look up mappings
// for resource types.
mapper meta.RESTMapper
// statusComputeFunc defines which function should be used for computing // statusComputeFunc defines which function should be used for computing
// the status of a resource. This is available for testing purposes. // the status of a resource. This is available for testing purposes.
statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error) statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)
@@ -44,9 +75,10 @@ type Resolver struct {
// NewResolver creates a new resolver with the provided client. Fetching // NewResolver creates a new resolver with the provided client. Fetching
// and polling of resources will be done using the provided client. // and polling of resources will be done using the provided client.
func NewResolver(client client.Reader, pollInterval time.Duration) *Resolver { func NewResolver(client client.Reader, mapper meta.RESTMapper, pollInterval time.Duration) *Resolver {
return &Resolver{ return &Resolver{
client: client, client: client,
mapper: mapper,
statusComputeFunc: status.Compute, statusComputeFunc: status.Compute,
pollInterval: pollInterval, pollInterval: pollInterval,
} }
@@ -58,24 +90,31 @@ func NewResolver(client client.Reader, pollInterval time.Duration) *Resolver {
type ResourceResult struct { type ResourceResult struct {
Result *status.Result Result *status.Result
Resource ResourceIdentifier ResourceIdentifier ResourceIdentifier
Error error Error error
} }
// FetchAndResolve returns the status for a list of resources. It will return // FetchAndResolveObjects returns the status for a list of kubernetes objects. These can be provided
// the status for each of them individually. The slice of ResourceIdentifiers will // either as Unstructured resources or the specific resource types. It will return the status for each
// only be used to get the information needed to fetch the updated state of // of them individually. The provided resources will only be used to get the information needed to
// the resources from the cluster. // fetch the updated state of the resources from the cluster.
func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIdentifier) []ResourceResult { func (r *Resolver) FetchAndResolveObjects(ctx context.Context, objects []KubernetesObject) []ResourceResult {
resourceIds := resourceIdentifiersFromObjects(objects)
return r.FetchAndResolve(ctx, resourceIds)
}
// FetchAndResolve returns the status for a list of ResourceIdentifiers. It will return
// the status for each of them individually.
func (r *Resolver) FetchAndResolve(ctx context.Context, resourceIDs []ResourceIdentifier) []ResourceResult {
var results []ResourceResult var results []ResourceResult
for _, resource := range resources { for _, resourceID := range resourceIDs {
u, err := r.fetchResource(ctx, resource) u, err := r.fetchResource(ctx, resourceID)
if err != nil { if err != nil {
if k8serrors.IsNotFound(errors.Cause(err)) { if k8serrors.IsNotFound(errors.Cause(err)) {
results = append(results, ResourceResult{ results = append(results, ResourceResult{
Resource: resource, ResourceIdentifier: resourceID,
Result: &status.Result{ Result: &status.Result{
Status: status.CurrentStatus, Status: status.CurrentStatus,
Message: "Resource does not exist", Message: "Resource does not exist",
@@ -87,7 +126,7 @@ func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIden
Status: status.UnknownStatus, Status: status.UnknownStatus,
Message: fmt.Sprintf("Error fetching resource from cluster: %v", err), Message: fmt.Sprintf("Error fetching resource from cluster: %v", err),
}, },
Resource: resource, ResourceIdentifier: resourceID,
Error: err, Error: err,
}) })
} }
@@ -96,7 +135,7 @@ func (r *Resolver) FetchAndResolve(ctx context.Context, resources []ResourceIden
res, err := r.statusComputeFunc(u) res, err := r.statusComputeFunc(u)
results = append(results, ResourceResult{ results = append(results, ResourceResult{
Result: res, Result: res,
Resource: resource, ResourceIdentifier: resourceID,
Error: err, Error: err,
}) })
} }
@@ -139,7 +178,7 @@ const (
type EventResource struct { type EventResource struct {
// Identifier contains information that identifies which resource // Identifier contains information that identifies which resource
// this information is about. // this information is about.
Identifier ResourceIdentifier ResourceIdentifier ResourceIdentifier
// Status is the latest status for the given resource. // Status is the latest status for the given resource.
Status status.Status Status status.Status
@@ -153,9 +192,18 @@ type EventResource struct {
Error error Error error
} }
// WaitForStatus polls all the provided resources until all of them has // WaitForStatus polls all the provided resources until all of them have reached the Current
// reached the Current status. Updates the channel as resources change their status and // status or the timeout specified through the context is reached. Updates on the status
// when the wait is either completed or aborted. // of individual resources and the aggregate status is provided through the Event channel.
func (r *Resolver) WaitForStatusOfObjects(ctx context.Context, objects []KubernetesObject) <-chan Event {
resourceIds := resourceIdentifiersFromObjects(objects)
return r.WaitForStatus(ctx, resourceIds)
}
// WaitForStatus polls all the resources references by the provided ResourceIdentifiers until
// all of them have reached the Current status or the timeout specified through the context is
// reached. Updates on the status of individual resources and the aggregate status is provided
// through the Event channel.
func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdentifier) <-chan Event { func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdentifier) <-chan Event {
eventChan := make(chan Event) eventChan := make(chan Event)
@@ -225,12 +273,11 @@ func (r *Resolver) WaitForStatus(ctx context.Context, resources []ResourceIdenti
// Completed type event. If the aggregate status has become Current, this function // Completed type event. If the aggregate status has become Current, this function
// will return true to signal that it is done. // will return true to signal that it is done.
func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState, eventChan chan Event) bool { func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState, eventChan chan Event) bool {
for id := range waitState.ResourceWaitStates { for resourceID := range waitState.ResourceWaitStates {
// Make sure we have a local copy since we are passing // Make sure we have a local copy since we are passing
// pointers to this variable as parameters to functions // pointers to this variable as parameters to functions
identifier := id u, err := r.fetchResource(ctx, resourceID)
u, err := r.fetchResource(ctx, &identifier) eventResource, updateObserved := waitState.ResourceObserved(resourceID, u, err)
eventResource, updateObserved := waitState.ResourceObserved(&identifier, u, err)
// Find the aggregate status based on the new state for this resource. // Find the aggregate status based on the new state for this resource.
aggStatus := waitState.AggregateStatus() aggStatus := waitState.AggregateStatus()
// We want events for changes in status for each resource, so send // We want events for changes in status for each resource, so send
@@ -259,15 +306,25 @@ func (r *Resolver) checkAllResources(ctx context.Context, waitState *waitState,
// through the client available in the Resolver. It returns the resource // through the client available in the Resolver. It returns the resource
// as an Unstructured. // as an Unstructured.
func (r *Resolver) fetchResource(ctx context.Context, identifier ResourceIdentifier) (*unstructured.Unstructured, error) { func (r *Resolver) fetchResource(ctx context.Context, identifier ResourceIdentifier) (*unstructured.Unstructured, error) {
key := types.NamespacedName{Name: identifier.GetName(), Namespace: identifier.GetNamespace()} // We need to look up the preferred version for the GroupKind and
u := &unstructured.Unstructured{ // whether the resource type is cluster scoped. We look this
Object: map[string]interface{}{ // up with the RESTMapper.
"apiVersion": identifier.GetAPIVersion(), mapping, err := r.mapper.RESTMapping(identifier.GroupKind)
"kind": identifier.GetKind(), if err != nil {
}, return nil, err
} }
err := r.client.Get(ctx, key, u)
//return u, err // Resources might not have the namespace set, which means we need to set
// it to `default` if the resource is namespace scoped.
namespace := identifier.Namespace
if namespace == "" && mapping.Scope.Name() == meta.RESTScopeNameNamespace {
namespace = defaultNamespace
}
key := types.NamespacedName{Name: identifier.Name, Namespace: namespace}
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(mapping.GroupVersionKind)
err = r.client.Get(ctx, key, u)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error fetching resource from cluster") return nil, errors.Wrap(err, "error fetching resource from cluster")
} }

View File

@@ -10,9 +10,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -32,6 +34,7 @@ func TestFetchAndResolve(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
resources []runtime.Object resources []runtime.Object
mapperGVKs []schema.GroupVersionKind
expectedResults []result expectedResults []result
}{ }{
"no resources": { "no resources": {
@@ -52,6 +55,9 @@ func TestFetchAndResolve(t *testing.T) {
}, },
}, },
}, },
mapperGVKs: []schema.GroupVersionKind{
appsv1.SchemeGroupVersion.WithKind("Deployment"),
},
expectedResults: []result{ expectedResults: []result{
{ {
status: status.InProgressStatus, status: status.InProgressStatus,
@@ -92,6 +98,10 @@ func TestFetchAndResolve(t *testing.T) {
}, },
}, },
}, },
mapperGVKs: []schema.GroupVersionKind{
appsv1.SchemeGroupVersion.WithKind("StatefulSet"),
corev1.SchemeGroupVersion.WithKind("Secret"),
},
expectedResults: []result{ expectedResults: []result{
{ {
status: status.CurrentStatus, status: status.CurrentStatus,
@@ -109,18 +119,18 @@ func TestFetchAndResolve(t *testing.T) {
tc := tc tc := tc
t.Run(tn, func(t *testing.T) { t.Run(tn, func(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, tc.resources...) fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, tc.resources...)
resolver := NewResolver(fakeClient, testPollInterval)
resolver := NewResolver(fakeClient, newRESTMapper(tc.mapperGVKs...), testPollInterval)
resolver.statusComputeFunc = status.Compute resolver.statusComputeFunc = status.Compute
var identifiers []ResourceIdentifier var identifiers []ResourceIdentifier
for _, resource := range tc.resources { for _, resource := range tc.resources {
gvk := resource.GetObjectKind().GroupVersionKind() gvk := resource.GetObjectKind().GroupVersionKind()
r := resource.(metav1.Object) r := resource.(metav1.Object)
identifiers = append(identifiers, &resourceKey{ identifiers = append(identifiers, ResourceIdentifier{
name: r.GetName(), Name: r.GetName(),
namespace: r.GetNamespace(), Namespace: r.GetNamespace(),
apiVersion: gvk.GroupVersion().String(), GroupKind: gvk.GroupKind(),
kind: gvk.Kind,
}) })
} }
@@ -128,7 +138,7 @@ func TestFetchAndResolve(t *testing.T) {
for i, res := range results { for i, res := range results {
id := identifiers[i] id := identifiers[i]
expectedRes := tc.expectedResults[i] expectedRes := tc.expectedResults[i]
rid := fmt.Sprintf("%s/%s", id.GetNamespace(), id.GetName()) rid := fmt.Sprintf("%s/%s", id.Namespace, id.Name)
if expectedRes.error { if expectedRes.error {
if res.Error == nil { if res.Error == nil {
t.Errorf("expected error for resource %s, but didn't get one", rid) t.Errorf("expected error for resource %s, but didn't get one", rid)
@@ -150,13 +160,15 @@ func TestFetchAndResolve(t *testing.T) {
func TestFetchAndResolveUnknownResource(t *testing.T) { func TestFetchAndResolveUnknownResource(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme) fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
resolver := NewResolver(fakeClient, testPollInterval) resolver := NewResolver(fakeClient, newRESTMapper(appsv1.SchemeGroupVersion.WithKind("Deployment")), testPollInterval)
results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{ results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{
&resourceKey{ {
apiVersion: "apps/v1", GroupKind: schema.GroupKind{
kind: "Deploymnet", Group: "apps",
name: "myDeployment", Kind: "Deployment",
namespace: "default", },
Name: "myDeployment",
Namespace: "default",
}, },
}) })
@@ -181,14 +193,17 @@ func TestFetchAndResolveWithFetchError(t *testing.T) {
&fakeReader{ &fakeReader{
Err: expectedError, Err: expectedError,
}, },
newRESTMapper(appsv1.SchemeGroupVersion.WithKind("Deployment")),
testPollInterval, testPollInterval,
) )
results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{ results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{
&resourceKey{ {
apiVersion: "apps/v1", GroupKind: schema.GroupKind{
kind: "Deploymnet", Group: "apps",
name: "myDeployment", Kind: "Deployment",
namespace: "default", },
Name: "myDeployment",
Namespace: "default",
}, },
}) })
@@ -222,7 +237,7 @@ func TestFetchAndResolveComputeStatusError(t *testing.T) {
} }
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, resource) fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, resource)
resolver := NewResolver(fakeClient, testPollInterval) resolver := NewResolver(fakeClient, newRESTMapper(appsv1.SchemeGroupVersion.WithKind("Deployment")), testPollInterval)
resolver.statusComputeFunc = func(u *unstructured.Unstructured) (*status.Result, error) { resolver.statusComputeFunc = func(u *unstructured.Unstructured) (*status.Result, error) {
return &status.Result{ return &status.Result{
@@ -231,11 +246,13 @@ func TestFetchAndResolveComputeStatusError(t *testing.T) {
}, expectedError }, expectedError
} }
results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{ results := resolver.FetchAndResolve(context.TODO(), []ResourceIdentifier{
&resourceKey{ {
apiVersion: resource.APIVersion, GroupKind: schema.GroupKind{
kind: resource.Kind, Group: resource.GroupVersionKind().Group,
name: resource.GetName(), Kind: resource.Kind,
namespace: resource.GetNamespace(), },
Name: resource.GetName(),
Namespace: resource.GetNamespace(),
}, },
}) })
@@ -358,23 +375,28 @@ func TestWaitForStatus(t *testing.T) {
tc := tc tc := tc
t.Run(tn, func(t *testing.T) { t.Run(tn, func(t *testing.T) {
var objs []runtime.Object var objs []runtime.Object
statusResults := make(map[resourceKey][]*status.Result) statusResults := make(map[ResourceIdentifier][]*status.Result)
var identifiers []ResourceIdentifier var identifiers []ResourceIdentifier
for obj, statuses := range tc.resources { for obj, statuses := range tc.resources {
objs = append(objs, obj) objs = append(objs, obj)
identifier := keyFromObject(obj) identifier := resourceIdentifierFromRuntimeObject(obj)
identifiers = append(identifiers, &identifier) identifiers = append(identifiers, identifier)
statusResults[identifier] = statuses statusResults[identifier] = statuses
} }
statusComputer := statusComputer{ statusComputer := statusComputer{
results: statusResults, results: statusResults,
resourceCallCount: make(map[resourceKey]int), resourceCallCount: make(map[ResourceIdentifier]int),
} }
resolver := &Resolver{ resolver := &Resolver{
client: fake.NewFakeClientWithScheme(scheme.Scheme, objs...), client: fake.NewFakeClientWithScheme(scheme.Scheme, objs...),
mapper: newRESTMapper(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
appsv1.SchemeGroupVersion.WithKind("StatefulSet"),
corev1.SchemeGroupVersion.WithKind("Service"),
),
statusComputeFunc: statusComputer.Compute, statusComputeFunc: statusComputer.Compute,
pollInterval: testPollInterval, pollInterval: testPollInterval,
} }
@@ -397,20 +419,20 @@ func TestWaitForStatus(t *testing.T) {
} }
var aggregateStatuses []status.Status var aggregateStatuses []status.Status
resourceStatuses := make(map[resourceKey][]status.Status) resourceStatuses := make(map[ResourceIdentifier][]status.Status)
for _, e := range events { for _, e := range events {
aggregateStatuses = append(aggregateStatuses, e.AggregateStatus) aggregateStatuses = append(aggregateStatuses, e.AggregateStatus)
if e.EventResource != nil { if e.EventResource != nil {
identifier := keyFromResourceIdentifier(e.EventResource.Identifier) identifier := e.EventResource.ResourceIdentifier
resourceStatuses[identifier] = append(resourceStatuses[identifier], e.EventResource.Status) resourceStatuses[identifier] = append(resourceStatuses[identifier], e.EventResource.Status)
} }
} }
for resource, expectedStatuses := range tc.expectedResourceStatuses { for resource, expectedStatuses := range tc.expectedResourceStatuses {
identifier := keyFromObject(resource) identifier := resourceIdentifierFromRuntimeObject(resource)
actualStatuses := resourceStatuses[identifier] actualStatuses := resourceStatuses[identifier]
if !reflect.DeepEqual(expectedStatuses, actualStatuses) { if !reflect.DeepEqual(expectedStatuses, actualStatuses) {
t.Errorf("expected statuses %v for resource %s/%s, but got %v", expectedStatuses, identifier.namespace, identifier.name, actualStatuses) t.Errorf("expected statuses %v for resource %s/%s, but got %v", expectedStatuses, identifier.Namespace, identifier.Name, actualStatuses)
} }
} }
@@ -423,21 +445,25 @@ func TestWaitForStatus(t *testing.T) {
func TestWaitForStatusDeletedResources(t *testing.T) { func TestWaitForStatusDeletedResources(t *testing.T) {
statusComputer := statusComputer{ statusComputer := statusComputer{
results: make(map[resourceKey][]*status.Result), results: make(map[ResourceIdentifier][]*status.Result),
resourceCallCount: make(map[resourceKey]int), resourceCallCount: make(map[ResourceIdentifier]int),
} }
resolver := &Resolver{ resolver := &Resolver{
client: fake.NewFakeClientWithScheme(scheme.Scheme), client: fake.NewFakeClientWithScheme(scheme.Scheme),
mapper: newRESTMapper(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
corev1.SchemeGroupVersion.WithKind("Service"),
),
statusComputeFunc: statusComputer.Compute, statusComputeFunc: statusComputer.Compute,
pollInterval: testPollInterval, pollInterval: testPollInterval,
} }
depResourceIdentifier := keyFromObject(deploymentResource) depResourceIdentifier := resourceIdentifierFromRuntimeObject(deploymentResource)
serviceResourceIdentifier := keyFromObject(serviceResource) serviceResourceIdentifier := resourceIdentifierFromRuntimeObject(serviceResource)
identifiers := []ResourceIdentifier{ identifiers := []ResourceIdentifier{
&depResourceIdentifier, depResourceIdentifier,
&serviceResourceIdentifier, serviceResourceIdentifier,
} }
eventChan := resolver.WaitForStatus(context.TODO(), identifiers) eventChan := resolver.WaitForStatus(context.TODO(), identifiers)
@@ -499,17 +525,12 @@ loop:
type statusComputer struct { type statusComputer struct {
t *testing.T t *testing.T
results map[resourceKey][]*status.Result results map[ResourceIdentifier][]*status.Result
resourceCallCount map[resourceKey]int resourceCallCount map[ResourceIdentifier]int
} }
func (s *statusComputer) Compute(u *unstructured.Unstructured) (*status.Result, error) { func (s *statusComputer) Compute(u *unstructured.Unstructured) (*status.Result, error) {
identifier := resourceKey{ identifier := resourceIdentifierFromRuntimeObject(u)
apiVersion: u.GetAPIVersion(),
kind: u.GetKind(),
name: u.GetName(),
namespace: u.GetNamespace(),
}
resourceResults, ok := s.results[identifier] resourceResults, ok := s.results[identifier]
if !ok { if !ok {
@@ -559,3 +580,15 @@ var serviceResource = &corev1.Service{
Namespace: "default", Namespace: "default",
}, },
} }
func newRESTMapper(gvks ...schema.GroupVersionKind) meta.RESTMapper {
var groupVersions []schema.GroupVersion
for _, gvk := range gvks {
groupVersions = append(groupVersions, gvk.GroupVersion())
}
mapper := meta.NewDefaultRESTMapper(groupVersions)
for _, gvk := range gvks {
mapper.Add(gvk, meta.RESTScopeNamespace)
}
return mapper
}

View File

@@ -10,41 +10,12 @@ import (
"sigs.k8s.io/kustomize/kstatus/status" "sigs.k8s.io/kustomize/kstatus/status"
) )
// resourceKey is a minimal implementation of
// the ResourceIdentifier interface.
type resourceKey struct {
name string
namespace string
apiVersion string
kind string
}
// GetName returns the name of the resource.
func (r *resourceKey) GetName() string {
return r.name
}
// GetNamespace returns the namespace of the resource.
func (r *resourceKey) GetNamespace() string {
return r.namespace
}
// GetAPIVersion returns the API version of the resource.
func (r *resourceKey) GetAPIVersion() string {
return r.apiVersion
}
// GetKind returns the Kind of the resource.
func (r *resourceKey) GetKind() string {
return r.kind
}
// waitState keeps the state about the resources and their last // waitState keeps the state about the resources and their last
// observed state. This is used to determine any changes in state // observed state. This is used to determine any changes in state
// so events can be sent when needed. // so events can be sent when needed.
type waitState struct { type waitState struct {
// ResourceWaitStates contains wait state for each of the resources. // ResourceWaitStates contains wait state for each of the resources.
ResourceWaitStates map[resourceKey]*resourceWaitState ResourceWaitStates map[ResourceIdentifier]*resourceWaitState
// statusComputeFunc defines the function used to compute the state of // statusComputeFunc defines the function used to compute the state of
// a single resource. This is available for testing purposes. // a single resource. This is available for testing purposes.
@@ -62,17 +33,11 @@ type resourceWaitState struct {
// newWaitState creates a new waitState object and initializes it with the // newWaitState creates a new waitState object and initializes it with the
// provided slice of resources and the provided statusComputeFunc. // provided slice of resources and the provided statusComputeFunc.
func newWaitState(resources []ResourceIdentifier, statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)) *waitState { func newWaitState(resourceIDs []ResourceIdentifier, statusComputeFunc func(u *unstructured.Unstructured) (*status.Result, error)) *waitState {
resourceWaitStates := make(map[resourceKey]*resourceWaitState) resourceWaitStates := make(map[ResourceIdentifier]*resourceWaitState)
for _, r := range resources { for _, resourceID := range resourceIDs {
identifier := resourceKey{ resourceWaitStates[resourceID] = &resourceWaitState{}
apiVersion: r.GetAPIVersion(),
kind: r.GetKind(),
name: r.GetName(),
namespace: r.GetNamespace(),
}
resourceWaitStates[identifier] = &resourceWaitState{}
} }
return &waitState{ return &waitState{
@@ -107,19 +72,12 @@ func (w *waitState) AggregateStatus() status.Status {
// that will be true if the status of the observed resource has changed // that will be true if the status of the observed resource has changed
// since the previous observation and false it not. This is used to determine // since the previous observation and false it not. This is used to determine
// whether a new event should be sent based on this observation. // whether a new event should be sent based on this observation.
func (w *waitState) ResourceObserved(id ResourceIdentifier, resource *unstructured.Unstructured, err error) (EventResource, bool) { func (w *waitState) ResourceObserved(resourceID ResourceIdentifier, resource *unstructured.Unstructured, err error) (EventResource, bool) {
identifier := resourceKey{
name: id.GetName(),
namespace: id.GetNamespace(),
apiVersion: id.GetAPIVersion(),
kind: id.GetKind(),
}
// Check for nil is not needed here as the id passed in comes // Check for nil is not needed here as the id passed in comes
// from iterating over the keys of the map. // from iterating over the keys of the map.
rws := w.ResourceWaitStates[identifier] rws := w.ResourceWaitStates[resourceID]
eventResource := w.getEventResource(identifier, resource, err) eventResource := w.getEventResource(resourceID, resource, err)
// If the new eventResource is identical to the previous one, we return // If the new eventResource is identical to the previous one, we return
// with the last return value indicating this is not a new event. // with the last return value indicating this is not a new event.
if rws.LastEvent != nil && reflect.DeepEqual(eventResource, *rws.LastEvent) { if rws.LastEvent != nil && reflect.DeepEqual(eventResource, *rws.LastEvent) {
@@ -133,19 +91,19 @@ func (w *waitState) ResourceObserved(id ResourceIdentifier, resource *unstructur
// the provided resourceKey. The EventResource contains information about the // the provided resourceKey. The EventResource contains information about the
// latest status for the given resource, so it computes status for the resource // latest status for the given resource, so it computes status for the resource
// as well as check for deletion. // as well as check for deletion.
func (w *waitState) getEventResource(identifier resourceKey, resource *unstructured.Unstructured, err error) EventResource { func (w *waitState) getEventResource(resourceID ResourceIdentifier, resource *unstructured.Unstructured, err error) EventResource {
// Get the resourceWaitState for this resource. It contains information // Get the resourceWaitState for this resource. It contains information
// of the previous observed statuses. We don't need to check for nil here // of the previous observed statuses. We don't need to check for nil here
// as the identifier comes from iterating over the keys of the // as the identifier comes from iterating over the keys of the
// ResourceWaitState map. // ResourceWaitState map.
r := w.ResourceWaitStates[identifier] r := w.ResourceWaitStates[resourceID]
// If fetching the resource from the cluster failed, we don't really // If fetching the resource from the cluster failed, we don't really
// know anything about the status of the resource, so simply // know anything about the status of the resource, so simply
// report the status as Unknown. // report the status as Unknown.
if err != nil && !k8serrors.IsNotFound(errors.Cause(err)) { if err != nil && !k8serrors.IsNotFound(errors.Cause(err)) {
return EventResource{ return EventResource{
Identifier: &identifier, ResourceIdentifier: resourceID,
Status: status.UnknownStatus, Status: status.UnknownStatus,
Message: fmt.Sprintf("Error: %s", err), Message: fmt.Sprintf("Error: %s", err),
Error: err, Error: err,
@@ -161,7 +119,7 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu
if k8serrors.IsNotFound(errors.Cause(err)) { if k8serrors.IsNotFound(errors.Cause(err)) {
r.HasBeenCurrent = true r.HasBeenCurrent = true
return EventResource{ return EventResource{
Identifier: &identifier, ResourceIdentifier: resourceID,
Status: status.CurrentStatus, Status: status.CurrentStatus,
Message: fmt.Sprintf("Resource has been deleted"), Message: fmt.Sprintf("Resource has been deleted"),
} }
@@ -177,7 +135,7 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu
if resource.GetDeletionTimestamp() != nil { if resource.GetDeletionTimestamp() != nil {
return EventResource{ return EventResource{
Identifier: &identifier, ResourceIdentifier: resourceID,
Status: status.TerminatingStatus, Status: status.TerminatingStatus,
Message: fmt.Sprintf("Resource is terminating"), Message: fmt.Sprintf("Resource is terminating"),
} }
@@ -188,7 +146,7 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu
// as Unknown. // as Unknown.
if err != nil { if err != nil {
return EventResource{ return EventResource{
Identifier: &identifier, ResourceIdentifier: resourceID,
Status: status.UnknownStatus, Status: status.UnknownStatus,
Message: fmt.Sprintf("Error: %s", err), Message: fmt.Sprintf("Error: %s", err),
Error: err, Error: err,
@@ -204,7 +162,7 @@ func (w *waitState) getEventResource(identifier resourceKey, resource *unstructu
} }
return EventResource{ return EventResource{
Identifier: &identifier, ResourceIdentifier: resourceID,
Status: statusResult.Status, Status: statusResult.Status,
Message: statusResult.Message, Message: statusResult.Message,
} }