mirror of
https://github.com/kubernetes-sigs/kustomize.git
synced 2026-06-10 08:20:59 +00:00
Merge pull request #2114 from mortent/FlexiblePrinting
Restructure the Apply command to separate printing from the code that actually does the work
This commit is contained in:
@@ -3,7 +3,9 @@ module sigs.k8s.io/kustomize/cmd/kubectl
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/go-errors/errors v1.0.1
|
||||
github.com/spf13/cobra v0.0.5
|
||||
github.com/stretchr/testify v1.4.0
|
||||
k8s.io/api v0.17.0
|
||||
k8s.io/apimachinery v0.17.0
|
||||
k8s.io/cli-runtime v0.17.0
|
||||
|
||||
@@ -83,6 +83,7 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
|
||||
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
|
||||
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
|
||||
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
|
||||
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
|
||||
@@ -364,7 +365,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 h1:rjwSpXsdiK0dV8/Naq3kAw9ymfAeJIyd0upUIElB+lI=
|
||||
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
@@ -504,7 +504,6 @@ k8s.io/kubectl v0.0.0-20191219154910-1528d4eea6dd h1:nZX5+wEqTu/EBIYjrZlFOA63z4+
|
||||
k8s.io/kubectl v0.0.0-20191219154910-1528d4eea6dd/go.mod h1:9ehGcuUGjXVZh0qbYSB0vvofQw2JQe6c6cO0k4wu/Oo=
|
||||
k8s.io/metrics v0.0.0-20191214191643-6b1944c9f765/go.mod h1:5V7rewilItwK0cz4nomU0b3XCcees2Ka5EBYWS1HBeM=
|
||||
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
|
||||
k8s.io/utils v0.0.0-20191030222137-2b95a09bc58d/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
|
||||
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f h1:GiPwtSzdP43eI1hpPCbROQCCIgCuiMMNF8YUVLF3vJo=
|
||||
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
|
||||
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
|
||||
|
||||
205
cmd/kubectl/kubectlcobra/applier.go
Normal file
205
cmd/kubectl/kubectlcobra/applier.go
Normal file
@@ -0,0 +1,205 @@
|
||||
// Copyright 2019 The Kubernetes Authors.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/kubectl/pkg/cmd/apply"
|
||||
"k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/kubectl/pkg/scheme"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/kustomize/kstatus/wait"
|
||||
)
|
||||
|
||||
// newApplier returns a new Applier. It will set up the applyOptions and
|
||||
// statusOptions which are responsible for capturing any command line flags.
|
||||
// It currently requires IOStreams, but this is a legacy from when
|
||||
// the ApplyOptions were responsible for printing progress. This is now
|
||||
// handled by a separate printer with the KubectlPrinterAdapter bridging
|
||||
// between the two.
|
||||
func newApplier(factory util.Factory, ioStreams genericclioptions.IOStreams) *Applier {
|
||||
return &Applier{
|
||||
applyOptions: apply.NewApplyOptions(ioStreams),
|
||||
statusOptions: NewStatusOptions(),
|
||||
factory: factory,
|
||||
ioStreams: ioStreams,
|
||||
}
|
||||
}
|
||||
|
||||
// resolver defines the interface the applier needs to observe status for resources.
|
||||
type resolver interface {
|
||||
WaitForStatusOfObjects(ctx context.Context, objects []wait.KubernetesObject) <-chan wait.Event
|
||||
}
|
||||
|
||||
// Applier performs the step of applying a set of resources into a cluster,
|
||||
// conditionally waits for all of them to be fully reconciled and finally
|
||||
// performs prune to clean up any resources that has been deleted.
|
||||
type Applier struct {
|
||||
factory util.Factory
|
||||
ioStreams genericclioptions.IOStreams
|
||||
|
||||
applyOptions *apply.ApplyOptions
|
||||
statusOptions *StatusOptions
|
||||
resolver resolver
|
||||
}
|
||||
|
||||
// Initialize sets up the Applier for actually doing an apply against
|
||||
// a cluster. This involves validating command line inputs and configuring
|
||||
// clients for communicating with the cluster.
|
||||
func (a *Applier) Initialize(cmd *cobra.Command) error {
|
||||
a.applyOptions.PreProcessorFn = PrependGroupingObject(a.applyOptions)
|
||||
err := a.applyOptions.Complete(a.factory, cmd)
|
||||
if err != nil {
|
||||
return errors.WrapPrefix(err, "error setting up ApplyOptions", 1)
|
||||
}
|
||||
// Default PostProcessor is configured in "Complete" function,
|
||||
// so the prune must happen after "Complete".
|
||||
a.applyOptions.PostProcessorFn = prune(a.factory, a.applyOptions)
|
||||
|
||||
resolver, err := a.newResolver(a.statusOptions.period)
|
||||
if err != nil {
|
||||
return errors.WrapPrefix(err, "error creating resolver", 1)
|
||||
}
|
||||
a.resolver = resolver
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetFlags configures the command line flags needed for apply and
|
||||
// status. This is a temporary solution as we should separate the configuration
|
||||
// of cobra flags from the Applier.
|
||||
func (a *Applier) SetFlags(cmd *cobra.Command) {
|
||||
a.applyOptions.DeleteFlags.AddFlags(cmd)
|
||||
a.applyOptions.RecordFlags.AddFlags(cmd)
|
||||
a.applyOptions.PrintFlags.AddFlags(cmd)
|
||||
a.statusOptions.AddFlags(cmd)
|
||||
a.applyOptions.Overwrite = true
|
||||
}
|
||||
|
||||
// newResolver sets up a new Resolver for computing status. The configuration
|
||||
// needed for the resolver is taken from the Factory.
|
||||
func (a *Applier) newResolver(pollInterval time.Duration) (*wait.Resolver, error) {
|
||||
config, err := a.factory.ToRESTConfig()
|
||||
if err != nil {
|
||||
return nil, errors.WrapPrefix(err, "error getting RESTConfig", 1)
|
||||
}
|
||||
|
||||
mapper, err := a.factory.ToRESTMapper()
|
||||
if err != nil {
|
||||
return nil, errors.WrapPrefix(err, "error getting RESTMapper", 1)
|
||||
}
|
||||
|
||||
c, err := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
|
||||
if err != nil {
|
||||
return nil, errors.WrapPrefix(err, "error creating client", 1)
|
||||
}
|
||||
|
||||
return wait.NewResolver(c, mapper, pollInterval), nil
|
||||
}
|
||||
|
||||
// Run performs the Apply step. This happens asynchronously with updates
|
||||
// on progress and any errors are reported back on the event channel.
|
||||
// Cancelling the operation or setting timeout on how long to wait
|
||||
// for it complete can be done with the passed in context.
|
||||
// Note: There sn't currently any way to interrupt the operation
|
||||
// before all the given resources have been applied to the cluster. Any
|
||||
// cancellation or timeout will only affect how long we wait for the
|
||||
// resources to become current.
|
||||
func (a *Applier) Run(ctx context.Context) <-chan Event {
|
||||
ch := make(chan Event)
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
adapter := &KubectlPrinterAdapter{
|
||||
ch: ch,
|
||||
}
|
||||
// The adapter is used to intercept what is meant to be printing
|
||||
// in the ApplyOptions, and instead turn those into events.
|
||||
a.applyOptions.ToPrinter = adapter.toPrinterFunc()
|
||||
// This provides us with a slice of all the objects that will be
|
||||
// applied to the cluster.
|
||||
infos, _ := a.applyOptions.GetObjects()
|
||||
err := a.applyOptions.Run()
|
||||
if err != nil {
|
||||
// If we see an error here we just report it on the channel and then
|
||||
// give up. Eventually we might be able to determine which errors
|
||||
// are fatal and which might allow us to continue.
|
||||
ch <- Event{
|
||||
EventType: ErrorEventType,
|
||||
ErrorEvent: ErrorEvent{
|
||||
Err: errors.WrapPrefix(err, "error applying resources", 1),
|
||||
},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if a.statusOptions.wait {
|
||||
statusChannel := a.resolver.WaitForStatusOfObjects(ctx, infosToObjects(infos))
|
||||
// As long as the statusChannel remains open, we take every statusEvent,
|
||||
// wrap it in an Event and send it on the channel.
|
||||
for statusEvent := range statusChannel {
|
||||
ch <- Event{
|
||||
EventType: StatusEventType,
|
||||
StatusEvent: statusEvent,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func infosToObjects(infos []*resource.Info) []wait.KubernetesObject {
|
||||
var objects []wait.KubernetesObject
|
||||
for _, info := range infos {
|
||||
u := info.Object.(*unstructured.Unstructured)
|
||||
objects = append(objects, u)
|
||||
}
|
||||
return objects
|
||||
}
|
||||
|
||||
// EventType determines the type of events that are available.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
ErrorEventType EventType = "error"
|
||||
ApplyEventType EventType = "apply"
|
||||
StatusEventType EventType = "status"
|
||||
)
|
||||
|
||||
// Event is the type of the objects that will be returned through
|
||||
// the channel that is returned from a call to Run. It contains
|
||||
// information about progress and errors encountered during
|
||||
// the process of doing apply, waiting for status and doing a prune.
|
||||
type Event struct {
|
||||
// EventType is the type of event.
|
||||
EventType EventType
|
||||
|
||||
// ErrorEvent contains information about any errors encountered.
|
||||
ErrorEvent ErrorEvent
|
||||
|
||||
// ApplyEvent contains information about progress pertaining to
|
||||
// applying a resource to the cluster.
|
||||
ApplyEvent ApplyEvent
|
||||
|
||||
// StatusEvents contains information about the status of one of
|
||||
// the applied resources.
|
||||
StatusEvent wait.Event
|
||||
}
|
||||
|
||||
type ErrorEvent struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
type ApplyEvent struct {
|
||||
Operation string
|
||||
Object runtime.Object
|
||||
}
|
||||
48
cmd/kubectl/kubectlcobra/applier_test.go
Normal file
48
cmd/kubectl/kubectlcobra/applier_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
// Copyright 2019 The Kubernetes Authors.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// The applier is currently hard to test, as the dependencies on the ApplyOptions and
|
||||
// the resolver are hard to stub out. As we work to better separate the different
|
||||
// responsibilities of the apply functionality, we should also make it easier to test.
|
||||
// This provides some basic tests for now.
|
||||
|
||||
func TestApplierWithUnknownFile(t *testing.T) {
|
||||
tf := cmdtesting.NewTestFactory()
|
||||
defer tf.Cleanup()
|
||||
iostreams, _, _, _ := genericclioptions.NewTestIOStreams()
|
||||
cmd := NewCmdApply("base", tf, iostreams)
|
||||
|
||||
applier := newApplier(tf, iostreams)
|
||||
filenames := []string{"file.yaml"}
|
||||
applier.applyOptions.DeleteFlags.FileNameFlags.Filenames = &filenames
|
||||
|
||||
err := applier.Initialize(cmd)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := applier.Run(context.TODO())
|
||||
|
||||
var events []Event
|
||||
for msg := range ch {
|
||||
events = append(events, msg)
|
||||
}
|
||||
|
||||
if !assert.Equal(t, 1, len(events)) {
|
||||
return
|
||||
}
|
||||
|
||||
event := events[0]
|
||||
if !assert.Equal(t, ErrorEventType, event.EventType) {
|
||||
return
|
||||
}
|
||||
assert.Contains(t, event.ErrorEvent.Err.Error(), "does not exist")
|
||||
}
|
||||
62
cmd/kubectl/kubectlcobra/basic_printer.go
Normal file
62
cmd/kubectl/kubectlcobra/basic_printer.go
Normal file
@@ -0,0 +1,62 @@
|
||||
// Copyright 2019 The Kubernetes Authors.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"sigs.k8s.io/kustomize/kstatus/wait"
|
||||
)
|
||||
|
||||
// BasicPrinter is a simple implementation that just prints the events
|
||||
// from the channel in the default format for kubectl.
|
||||
// We need to support different printers for different output formats.
|
||||
type BasicPrinter struct {
|
||||
ioStreams genericclioptions.IOStreams
|
||||
}
|
||||
|
||||
// Print outputs the events from the provided channel in a simple
|
||||
// format on StdOut. As we support other printer implementations
|
||||
// this should probably be an interface.
|
||||
// This function will block until the channel is closed.
|
||||
func (b *BasicPrinter) Print(ch <-chan Event) {
|
||||
for event := range ch {
|
||||
switch event.EventType {
|
||||
case ErrorEventType:
|
||||
cmdutil.CheckErr(event.ErrorEvent.Err)
|
||||
case ApplyEventType:
|
||||
obj := event.ApplyEvent.Object
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
name := "<unknown>"
|
||||
if acc, err := meta.Accessor(obj); err == nil {
|
||||
if n := acc.GetName(); len(n) > 0 {
|
||||
name = n
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(b.ioStreams.Out, "%s %s\n", resourceIdToString(gvk.GroupKind(), name), event.ApplyEvent.Operation)
|
||||
case StatusEventType:
|
||||
statusEvent := event.StatusEvent
|
||||
switch statusEvent.Type {
|
||||
case wait.ResourceUpdate:
|
||||
id := statusEvent.EventResource.ResourceIdentifier
|
||||
gk := id.GroupKind
|
||||
fmt.Fprintf(b.ioStreams.Out, "%s is %s: %s\n", resourceIdToString(gk, id.Name), statusEvent.EventResource.Status.String(), statusEvent.EventResource.Message)
|
||||
case wait.Completed:
|
||||
fmt.Fprint(b.ioStreams.Out, "all resources has reached the Current status\n")
|
||||
case wait.Aborted:
|
||||
fmt.Fprintf(b.ioStreams.Out, "resources failed to the reached Current status\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resourceIdToString returns the string representation of a GroupKind and a resource name.
|
||||
func resourceIdToString(gk schema.GroupKind, name string) string {
|
||||
return fmt.Sprintf("%s/%s", strings.ToLower(gk.String()), name)
|
||||
}
|
||||
@@ -5,6 +5,7 @@
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -79,10 +80,10 @@ func updateHelp(names []string, c *cobra.Command) {
|
||||
|
||||
// NewCmdApply creates the `apply` command
|
||||
func NewCmdApply(baseName string, f util.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
|
||||
o := apply.NewApplyOptions(ioStreams)
|
||||
so := newStatusOptions(f, ioStreams)
|
||||
// Set up grouping object for this apply; used in subsequent prune.
|
||||
o.PreProcessorFn = PrependGroupingObject(o)
|
||||
applier := newApplier(f, ioStreams)
|
||||
printer := &BasicPrinter{
|
||||
ioStreams: ioStreams,
|
||||
}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "apply (-f FILENAME | -k DIRECTORY)",
|
||||
@@ -94,31 +95,28 @@ func NewCmdApply(baseName string, f util.Factory, ioStreams genericclioptions.IO
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if len(args) > 0 {
|
||||
// check is kustomize, if so update
|
||||
o.DeleteFlags.FileNameFlags.Kustomize = &args[0]
|
||||
applier.applyOptions.DeleteFlags.FileNameFlags.Kustomize = &args[0]
|
||||
}
|
||||
|
||||
cmdutil.CheckErr(o.Complete(f, cmd))
|
||||
// Default PostProcessor is configured in "Complete" function,
|
||||
// so the prune must happen after "Complete".
|
||||
o.PostProcessorFn = prune(f, o)
|
||||
cmdutil.CheckErr(o.Run())
|
||||
infos, _ := o.GetObjects()
|
||||
if so.wait {
|
||||
cmdutil.CheckErr(so.waitForStatus(infos))
|
||||
}
|
||||
cmdutil.CheckErr(applier.Initialize(cmd))
|
||||
|
||||
// Create a context with the provided timout from the cobra parameter.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), applier.statusOptions.timeout)
|
||||
defer cancel()
|
||||
// Run the applier. It will return a channel where we can receive updates
|
||||
// to keep track of progress and any issues.
|
||||
ch := applier.Run(ctx)
|
||||
|
||||
// The printer will print updates from the channel. It will block
|
||||
// until the channel is closed.
|
||||
printer.Print(ch)
|
||||
},
|
||||
}
|
||||
|
||||
// bind flag structs
|
||||
o.DeleteFlags.AddFlags(cmd)
|
||||
o.RecordFlags.AddFlags(cmd)
|
||||
o.PrintFlags.AddFlags(cmd)
|
||||
so.AddFlags(cmd)
|
||||
|
||||
o.Overwrite = true
|
||||
applier.SetFlags(cmd)
|
||||
|
||||
cmdutil.AddValidateFlags(cmd)
|
||||
cmd.Flags().BoolVar(&o.ServerDryRun, "server-dry-run", o.ServerDryRun, "If true, request will be sent to server with dry-run flag, which means the modifications won't be persisted. This is an alpha feature and flag.")
|
||||
cmd.Flags().BoolVar(&applier.applyOptions.ServerDryRun, "server-dry-run", applier.applyOptions.ServerDryRun, "If true, request will be sent to server with dry-run flag, which means the modifications won't be persisted. This is an alpha feature and flag.")
|
||||
cmd.Flags().Bool("dry-run", false, "If true, only print the object that would be sent, without sending it. Warning: --dry-run cannot accurately output the result of merging the local manifest and the server-side data. Use --server-dry-run to get the merged result instead.")
|
||||
cmdutil.AddServerSideApplyFlags(cmd)
|
||||
|
||||
|
||||
53
cmd/kubectl/kubectlcobra/printer_adapter.go
Normal file
53
cmd/kubectl/kubectlcobra/printer_adapter.go
Normal file
@@ -0,0 +1,53 @@
|
||||
// Copyright 2019 The Kubernetes Authors.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/cli-runtime/pkg/printers"
|
||||
)
|
||||
|
||||
// KubectlPrinterAdapter is a workaround for capturing progress from
|
||||
// ApplyOptions. ApplyOptions were originally meant to print progress
|
||||
// directly using a configurable printer. The KubectlPrinterAdapter
|
||||
// plugs into ApplyOptions as a ToPrinter function, but instead of
|
||||
// printing the info, it emits it as an event on the provided channel.
|
||||
type KubectlPrinterAdapter struct {
|
||||
ch chan<- Event
|
||||
}
|
||||
|
||||
// resourcePrinterImpl implements the ResourcePrinter interface. But
|
||||
// instead of printing, it emits information on the provided channel.
|
||||
type resourcePrinterImpl struct {
|
||||
operation string
|
||||
ch chan<- Event
|
||||
}
|
||||
|
||||
// PrintObj takes the provided object and operation and emits
|
||||
// it on the channel.
|
||||
func (r *resourcePrinterImpl) PrintObj(obj runtime.Object, _ io.Writer) error {
|
||||
r.ch <- Event{
|
||||
EventType: ApplyEventType,
|
||||
ApplyEvent: ApplyEvent{
|
||||
Operation: r.operation,
|
||||
Object: obj,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type toPrinterFunc func(string) (printers.ResourcePrinter, error)
|
||||
|
||||
// toPrinterFunc returns a function of type toPrinterFunc. This
|
||||
// is the type required by the ApplyOptions.
|
||||
func (p *KubectlPrinterAdapter) toPrinterFunc() toPrinterFunc {
|
||||
return func(operation string) (printers.ResourcePrinter, error) {
|
||||
return &resourcePrinterImpl{
|
||||
ch: p.ch,
|
||||
operation: operation,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
49
cmd/kubectl/kubectlcobra/printer_adapter_test.go
Normal file
49
cmd/kubectl/kubectlcobra/printer_adapter_test.go
Normal file
@@ -0,0 +1,49 @@
|
||||
// Copyright 2019 The Kubernetes Authors.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestKubectlPrinterAdapter(t *testing.T) {
|
||||
ch := make(chan Event)
|
||||
buffer := bytes.Buffer{}
|
||||
operation := "operation"
|
||||
|
||||
adapter := KubectlPrinterAdapter{
|
||||
ch: ch,
|
||||
}
|
||||
|
||||
toPrinterFunc := adapter.toPrinterFunc()
|
||||
resourcePrinter, err := toPrinterFunc(operation)
|
||||
assert.NoError(t, err)
|
||||
|
||||
deployment := appsv1.Deployment{
|
||||
TypeMeta: v1.TypeMeta{
|
||||
APIVersion: "apps/v1",
|
||||
Kind: "Deployment",
|
||||
},
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: "name",
|
||||
Namespace: "namespace",
|
||||
},
|
||||
}
|
||||
|
||||
// Need to run this in a separate gorutine since go channels
|
||||
// are blocking.
|
||||
go func() {
|
||||
err = resourcePrinter.PrintObj(&deployment, &buffer)
|
||||
}()
|
||||
msg := <-ch
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, operation, msg.ApplyEvent.Operation)
|
||||
assert.Equal(t, &deployment, msg.ApplyEvent.Object)
|
||||
}
|
||||
@@ -4,102 +4,27 @@
|
||||
package kubectlcobra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/kubectl/pkg/scheme"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/kustomize/kstatus/wait"
|
||||
)
|
||||
|
||||
type StatusOptions struct {
|
||||
factory util.Factory
|
||||
ioStreams genericclioptions.IOStreams
|
||||
func NewStatusOptions() *StatusOptions {
|
||||
return &StatusOptions{
|
||||
wait: false,
|
||||
period: 2 * time.Second,
|
||||
timeout: time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
type StatusOptions struct {
|
||||
wait bool
|
||||
period time.Duration
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func newStatusOptions(factory util.Factory, ioStreams genericclioptions.IOStreams) *StatusOptions {
|
||||
return &StatusOptions{
|
||||
factory: factory,
|
||||
ioStreams: ioStreams,
|
||||
|
||||
wait: false,
|
||||
period: 2 * time.Second,
|
||||
timeout: 1 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusOptions) AddFlags(c *cobra.Command) {
|
||||
c.Flags().BoolVar(&s.wait, "status", s.wait, "Wait for all applied resources to reach the Current status.")
|
||||
c.Flags().DurationVar(&s.period, "status-period", s.period, "Polling period for resource statuses.")
|
||||
c.Flags().DurationVar(&s.timeout, "status-timeout", s.timeout, "Timeout threshold for waiting for all resources to reach the Current status.")
|
||||
}
|
||||
|
||||
func (s *StatusOptions) waitForStatus(infos []*resource.Info) error {
|
||||
mapper, err := getRESTMapper(s.factory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := getClient(s.factory, mapper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
|
||||
defer cancel()
|
||||
|
||||
resolver := wait.NewResolver(c, mapper, s.period)
|
||||
ch := resolver.WaitForStatus(ctx, infosToResourceIdentifiers(infos))
|
||||
|
||||
for msg := range ch {
|
||||
switch msg.Type {
|
||||
case wait.ResourceUpdate:
|
||||
id := msg.EventResource.ResourceIdentifier
|
||||
gk := id.GroupKind
|
||||
fmt.Fprintf(s.ioStreams.Out, "%s/%s is %s: %s\n", strings.ToLower(gk.String()), id.Name, msg.EventResource.Status.String(), msg.EventResource.Message)
|
||||
case wait.Completed:
|
||||
fmt.Fprint(s.ioStreams.Out, "all resources has reached the Current status\n")
|
||||
case wait.Aborted:
|
||||
fmt.Fprintf(s.ioStreams.Out, "resources failed to the reached Current status after %s\n", s.timeout.String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func infosToResourceIdentifiers(infos []*resource.Info) []wait.ResourceIdentifier {
|
||||
var resources []wait.ResourceIdentifier
|
||||
for _, info := range infos {
|
||||
u := info.Object.(*unstructured.Unstructured)
|
||||
resources = append(resources, wait.ResourceIdentifier{
|
||||
GroupKind: u.GroupVersionKind().GroupKind(),
|
||||
Namespace: u.GetNamespace(),
|
||||
Name: u.GetName(),
|
||||
})
|
||||
}
|
||||
return resources
|
||||
}
|
||||
|
||||
func getRESTMapper(f util.Factory) (meta.RESTMapper, error) {
|
||||
return f.ToRESTMapper()
|
||||
}
|
||||
|
||||
func getClient(f util.Factory, mapper meta.RESTMapper) (client.Reader, error) {
|
||||
config, err := f.ToRESTConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user