Skip to content

Commit

Permalink
Merge pull request #958 from turbot/v1.0.x
Browse files Browse the repository at this point in the history
Release Flowpipe v1.0.1 (final)
  • Loading branch information
vhadianto authored Oct 28, 2024
2 parents 41536e6 + 2607cca commit 169d70f
Show file tree
Hide file tree
Showing 31 changed files with 653 additions and 135 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# Flowpipe

## v1.0.2 [tbd]

_Bug fixes_

* Event jsonl output file deletion is now handled correctly. ([#960](https://github.com/turbot/flowpipe/issues/960)).
* `trigger run` command now exits when the execution is paused. ([#962](https://github.com/turbot/flowpipe/issues/962)).


## v1.0.1 [2024-10-25]

_Bug fixes_

* Fix crashing cases when using `--output json`. ([#594](https://github.com/turbot/pipe-fittings/issues/594)).
* Coerce variables set in interactive console to their declared type. ([#595](https://github.com/turbot/pipe-fittings/issues/595)).
* Nested pipelines now correctly pauses parent pipelines. ([#955](https://github.com/turbot/flowpipe/issues/955)).
* Pipeline with `max_concurrency` setting is now automatically paused and will successfully resume. ([#957](https://github.com/turbot/flowpipe/issues/957)).
* `form_url` is now sanitized.


## v1.0.0 [2024-10-22]

_What's new?_
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.19
github.com/sagikazarmark/slog-shim v0.1.0
github.com/turbot/flowpipe-sdk-go v1.0.0
github.com/turbot/pipe-fittings v1.6.0
github.com/turbot/pipe-fittings v1.6.5
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.8.0
)
Expand Down Expand Up @@ -166,6 +166,8 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down Expand Up @@ -201,7 +203,7 @@ require (
github.com/muesli/termenv v0.15.2 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/otiai10/copy v1.14.0 // indirect
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,10 @@ github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUO
github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand Down Expand Up @@ -794,8 +798,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.28.1 h1:MijcGUbfYuznzK/5R4CPNoUP/9Xvuo20sXfEm6XxoTA=
github.com/onsi/gomega v1.28.1/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down Expand Up @@ -922,8 +926,8 @@ github.com/turbot/flowpipe-sdk-go v1.0.0 h1:q8zrZ/KKECDDCl5139CzPJUXfZTiS0vITOx7
github.com/turbot/flowpipe-sdk-go v1.0.0/go.mod h1:tP5opISn4bPv8ubRN7Kt1xvXJXgDkd8LEV0iAGHDar4=
github.com/turbot/go-kit v0.10.0-rc.0 h1:kd+jp2ibbIV33Hc8SsMAN410Dl9Pz6SJ40axbKUlSoA=
github.com/turbot/go-kit v0.10.0-rc.0/go.mod h1:fFQqR59I5z5JeeBLfK1PjSifn4Oprs3NiQx0CxeSJxs=
github.com/turbot/pipe-fittings v1.6.0 h1:sn4OJPQQR0fggKMNUuyvShJipZG/Ze5jR48Hl12Eokg=
github.com/turbot/pipe-fittings v1.6.0/go.mod h1:1nlRVh18QkYy9eq5pW9gpnoE2VgnQW0Y2zKzrH8Q4kI=
github.com/turbot/pipe-fittings v1.6.5 h1:bHm6dVDUvyw7as8Icxw1Jtld9Kju5OvuS6jtVXorbFo=
github.com/turbot/pipe-fittings v1.6.5/go.mod h1:1nlRVh18QkYy9eq5pW9gpnoE2VgnQW0Y2zKzrH8Q4kI=
github.com/turbot/pipes-sdk-go v0.9.1 h1:2yRojY2wymvJn6NQyE6A0EDFV267MNe+yDLxPVvsBwM=
github.com/turbot/pipes-sdk-go v0.9.1/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc=
github.com/turbot/steampipe-plugin-code v0.7.0 h1:SROYIo/TI/Q/YNfXK+sAIS71umypUFm1Uz851TmoJkM=
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"math"
"os"
"reflect"
"time"
Expand Down Expand Up @@ -400,7 +401,8 @@ func runTriggerFunc(cmd *cobra.Command, args []string) {
lastStatus = displayProgressLogs(ctx, cmd, pipelineExecutionResponse, pollLogFunc)
default:
// TODO: hack here. We should have a printer for this
triggerResult, err := api.WaitForTrigger(resp.Flowpipe.Name, resp.Flowpipe.ProcessID, 500)
slog.Info("Execution ID:", "executionID", resp.Flowpipe.ProcessID)
triggerResult, err := api.WaitForTrigger(resp.Flowpipe.Name, resp.Flowpipe.ProcessID, math.MaxInt64)
if err != nil {
error_helpers.FailOnErrorWithMessage(err, "failed waiting for trigger")
return
Expand Down
74 changes: 54 additions & 20 deletions internal/es/command/execution_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (h ExecutionPlanHandler) Handle(ctx context.Context, c interface{}) error {
plannerMutex.Unlock()
}()

// slog.Info("Received execution plan command", "execution_id", cmd.Event.ExecutionID)

ex, err := execution.GetExecution(cmd.Event.ExecutionID)
if err != nil {
slog.Error("Error loading execution", "error", err)
Expand Down Expand Up @@ -69,26 +71,6 @@ func (h ExecutionPlanHandler) Handle(ctx context.Context, c interface{}) error {
return nil
}

// check if all pipelines are paused
allPaused := true
for _, pex := range ex.PipelineExecutions {
if pex.Status != "paused" {
allPaused = false
break
}
}

if allPaused {
// raise execution paused
cmd := event.ExecutionPausedFromExecutionPlan(cmd)
err = h.EventBus.Publish(ctx, cmd)
if err != nil {
slog.Error("Error publishing event", "error", err)
return nil
}
return nil
}

// Check if all the pipelines are finished
allFinished := true
for _, pex := range ex.PipelineExecutions {
Expand Down Expand Up @@ -151,7 +133,59 @@ func (h ExecutionPlanHandler) Handle(ctx context.Context, c interface{}) error {
return nil
}
}
}

// check if all pipelines are paused
slog.Info("Checking if all pipelines are paused", "execution_id", cmd.Event.ExecutionID)
allPausedOrFailed := true
for _, pex := range ex.PipelineExecutions {
if pex.Status != "paused" && !slices.Contains(event.EndEvents, pex.Status) {
allPausedOrFailed = false
break
}
}

if allPausedOrFailed {
slog.Info("All pipelines are paused, failed, cancelled or finished", "execution_id", cmd.Event.ExecutionID)
failure := false
// any failure?
for _, pex := range ex.PipelineExecutions {
if pex.Status == "failed" {
failure = true
break
}
}

if failure {
if ex.TriggerExecution != nil {
slog.Info("Raising trigger fail event", "execution_id", cmd.Event.ExecutionID)
cmd := event.TriggerFailedFromExecutionPlan(cmd, ex.TriggerExecution.Name)
err = h.EventBus.Publish(ctx, cmd)
if err != nil {
slog.Error("Error publishing event", "error", err)
return nil
}
return nil
} else {
slog.Info("Raising execution fail event", "execution_id", cmd.Event.ExecutionID)
cmd := event.ExecutionFailedFromExecutionPlan(cmd, perr.InternalWithMessage("pipeline failed"))
err = h.EventBus.Publish(ctx, cmd)
if err != nil {
slog.Error("Error publishing event", "error", err)
return nil
}
return nil
}
}

// raise execution paused
cmd := event.ExecutionPausedFromExecutionPlan(cmd)
err = h.EventBus.Publish(ctx, cmd)
if err != nil {
slog.Error("Error publishing event", "error", err)
return nil
}
return nil
}

// Right now there's not much to do in execution plan, we still need to start with either a single
Expand Down
7 changes: 5 additions & 2 deletions internal/es/command/pipeline_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,12 @@ func (h PipelinePlanHandler) Handle(ctx context.Context, c interface{}) error {
continue
}

maxConcurency := stepDefn.GetMaxConcurrency(evalContext)

nextStep := modconfig.NextStep{
StepName: stepDefn.GetFullyQualifiedName(),
Action: modconfig.NextStepActionStart,
StepName: stepDefn.GetFullyQualifiedName(),
Action: modconfig.NextStepActionStart,
MaxConcurrency: maxConcurency,
}

// Check if there's a for_each.
Expand Down
3 changes: 3 additions & 0 deletions internal/es/command/step_for_each_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func (h StepForEachPlanHandler) Handle(ctx context.Context, c interface{}) error
}
nextStep.StepForEach = forEachControl

maxConcurrency := stepDefn.GetMaxConcurrency(evalContext)
nextStep.MaxConcurrency = maxConcurrency

nextSteps = append(nextSteps, nextStep)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/es/estest/default_mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (suite *DefaultModTestSuite) TestNestedWithConn() {
return
}

_, pex, err := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 40, "finished")
_, pex, err := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 100, "finished")
if err != nil {
assert.Fail("Error getting pipeline execution", err)
return
Expand Down Expand Up @@ -844,7 +844,7 @@ func (suite *DefaultModTestSuite) TestInputStepError() {
return
}

_, pex, err := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 40, "failed")
_, pex, err := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 100, "failed")
if err != nil {
assert.Fail("Error getting pipeline execution", err)
return
Expand Down
10 changes: 10 additions & 0 deletions internal/es/estest/test_suite_mod_2/complex_var_pipeline.fp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pipeline "complex_var" {

step "transform" "val" {
value = var.complex_var
}

output "val" {
value = step.transform.val
}
}
13 changes: 12 additions & 1 deletion internal/es/estest/test_suite_mod_2/flowpipe.fpvars
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
notifier = notifier.admin
connection = connection.steampipe.default
connection = connection.steampipe.default

complex_var = {
"first": {
"name": "Billy",
"age": 2,
},
"second": {
"name": "Joe",
"age": 5,
}
}
30 changes: 30 additions & 0 deletions internal/es/estest/test_suite_mod_2/input_foreach_not_as_deep.fp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
pipeline "foreach_parent" {

step "pipeline" "nested_two" {
for_each = ["one", "two", "three"]

pipeline = pipeline.foreach_child

max_concurrency = 1

args = {
foreach_child = each.value
}
}
}

pipeline "foreach_child" {
param "foreach_child" {
type = string
}

step "input" "my_step" {
type = "button"
prompt = "${param.foreach_child} - Do you want to approve?"

option "Approve" {}
option "Deny" {}

notifier = notifier.admin
}
}
39 changes: 38 additions & 1 deletion internal/es/estest/test_suite_mod_2/input_step_nested.fp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pipeline "input_step_parent" {

step "input" "my_step" {
type = "button"
prompt = "Do you want to approve?"
prompt = "input_step_parent - Do you want to approve?"

option "Approve" {}
option "Deny" {}
Expand All @@ -13,6 +13,10 @@ pipeline "input_step_parent" {
step "pipeline" "nested" {
pipeline = pipeline.input_step_child
}

output "val" {
value = step.pipeline.nested
}
}

pipeline "input_step_child" {
Expand All @@ -24,6 +28,39 @@ pipeline "input_step_child" {
step "input" "my_step" {
depends_on = [step.sleep.sleep]

type = "button"
prompt = "input_step_child - Do you want to approve?"

option "Approve" {}
option "Deny" {}

notifier = notifier.admin
}
}

pipeline "parent_with_no_input_step" {

step "pipeline" "nested" {
pipeline = pipeline.input_step_child_with_no_sleep
}

step "pipeline" "nested_2" {
pipeline = pipeline.input_step_child_with_no_sleep
}


output "val" {
value = step.pipeline.nested
}

output "val_2" {
value = step.pipeline.nested_2
}
}

pipeline "input_step_child_with_no_sleep" {

step "input" "my_step" {
type = "button"
prompt = "Do you want to approve?"

Expand Down
Loading

0 comments on commit 169d70f

Please sign in to comment.