remove break after input in pipeline

This commit is contained in:
Donny Xia
2020-08-20 11:54:56 -07:00
parent 037ac3b134
commit 501684a9c6
3 changed files with 48 additions and 13 deletions

View File

@@ -77,6 +77,6 @@ func (r *SourceRunner) runE(c *cobra.Command, args []string) error {
inputs = []kio.Reader{&kio.ByteReader{Reader: c.InOrStdin()}} inputs = []kio.Reader{&kio.ByteReader{Reader: c.InOrStdin()}}
} }
err := kio.Pipeline{Inputs: inputs, Outputs: outputs, ContinueIfInputEmpty: true}.Execute() err := kio.Pipeline{Inputs: inputs, Outputs: outputs}.Execute()
return handleError(c, err) return handleError(c, err)
} }

View File

@@ -75,11 +75,6 @@ type Pipeline struct {
// Outputs are where the transformed Resource Configuration is written. // Outputs are where the transformed Resource Configuration is written.
Outputs []Writer `yaml:"outputs,omitempty"` Outputs []Writer `yaml:"outputs,omitempty"`
// ContinueIfInputEmpty indicates should pipeline continue when the
// the input result is empty. This is useful when we want to run outputs
// even the input is empty.
ContinueIfInputEmpty bool
} }
// Execute executes each step in the sequence, returning immediately after encountering // Execute executes each step in the sequence, returning immediately after encountering
@@ -104,10 +99,6 @@ func (p Pipeline) ExecuteWithCallback(callback PipelineExecuteCallbackFunc) erro
} }
result = append(result, nodes...) result = append(result, nodes...)
} }
if len(result) == 0 && !p.ContinueIfInputEmpty {
// no inputs to operate on
return nil
}
// apply operations // apply operations
var err error var err error
@@ -117,6 +108,9 @@ func (p Pipeline) ExecuteWithCallback(callback PipelineExecuteCallbackFunc) erro
callback(op) callback(op)
} }
result, err = op.Filter(result) result, err = op.Filter(result)
// TODO: This len(result) == 0 should be removed and empty result list should be
// handled by outputs. However currently the some writer like LocalPackageReadWriter
// will clear the output directory and which will cause unpredictable results
if len(result) == 0 || err != nil { if len(result) == 0 || err != nil {
return errors.Wrap(err) return errors.Wrap(err)
} }

View File

@@ -78,7 +78,7 @@ func TestPipelineWithCallback(t *testing.T) {
} }
} }
func TestContinueIfInputEmpty(t *testing.T) { func TestEmptyInput(t *testing.T) {
actual := &bytes.Buffer{} actual := &bytes.Buffer{}
output := ByteWriter{ output := ByteWriter{
Sort: true, Sort: true,
@@ -88,8 +88,7 @@ func TestContinueIfInputEmpty(t *testing.T) {
output.Writer = actual output.Writer = actual
p := Pipeline{ p := Pipeline{
Outputs: []Writer{output}, Outputs: []Writer{output},
ContinueIfInputEmpty: true,
} }
err := p.Execute() err := p.Execute()
@@ -108,3 +107,45 @@ items: []
t.FailNow() t.FailNow()
} }
} }
func TestEmptyInputWithFilter(t *testing.T) {
actual := &bytes.Buffer{}
output := ByteWriter{
Sort: true,
WrappingKind: ResourceListKind,
WrappingAPIVersion: ResourceListAPIVersion,
}
output.Writer = actual
filters := []Filter{
FilterFunc(func(nodes []*yaml.RNode) ([]*yaml.RNode, error) {
nodes = append(nodes, yaml.NewMapRNode(&map[string]string{
"foo": "bar",
}))
return nodes, nil
}),
FilterFunc(func(nodes []*yaml.RNode) ([]*yaml.RNode, error) { return nodes, nil }),
}
p := Pipeline{
Outputs: []Writer{output},
Filters: filters,
}
err := p.Execute()
if err != nil {
t.Fatal(err)
}
expected := `
apiVersion: config.kubernetes.io/v1alpha1
kind: ResourceList
items:
- foo: bar
`
if !assert.Equal(t,
strings.TrimSpace(expected), strings.TrimSpace(actual.String())) {
t.FailNow()
}
}