Skip to content

Pipeline execution halted under certain circumstances #836

Closed
@graza-io

Description

Context

I can replicate a pipeline execution freeze under a very specific set of circumstances; this one was hard to identify the circumstances that caused it but after a bit of trial & error I was able to isolate the flow.

When a pipeline receives a collection and we then attempt to use an if/for_each/max_concurrency=1 on two options where the last listed option is a pipeline that would be skipped based on the if statement; the pipeline execution will halt after skipping the first item in the for_each.

Note: This may require running against a server

Pipeline to reproduce

locals {
  use_message = "yes"
}

pipeline "caller" {
  step "pipeline" "step_name" {
    pipeline = pipeline.receiver
    args     = {
      a = [
        {
          name = "Bob"
          message = "some message here"
        },
        {
          name = "NotBob"
          message = "different message"
        }
      ]
    }
  }
}

pipeline "receiver" {
  param "a" {
    type = any
  }

  step "transform" "key_index" {
    value = {for item in param.a : item.name => item}
  }

  step "message" "example" {
    if       = local.use_message == "yes"
    for_each = step.transform.key_index.value
    notifier = notifier["default"]
    text     = "[Example] Key: ${each.key} -> ${each.value.message}"
  }

  step "pipeline" "other" {
    if       = local.use_message != "yes"
    for_each = step.transform.key_index.value
    max_concurrency = 1
    pipeline  =  pipeline.receiver2
    args     = {
      item = each.value
    }
  }
}

pipeline "receiver2" {
  param "item" {
    type = any
  }

  step "message" "example" {
    notifier = notifier["default"]
    text     = "[Other] ${param.item.name}"
  }
}

Result of pipeline

Client:

❯ flowpipe pipeline run caller --host local
[flowpipe] Execution ID: exec_col29u24jkdkpb9fccf0
[caller] Starting pipeline
[caller.step_name] Starting pipeline
[receiver] Starting pipeline
[receiver.key_index] Starting transform
[receiver.key_index] Complete 1ms
[receiver.example[Bob]] Starting message: [Example] Key: Bob -> some message here
[receiver.example[Bob]] Arg text = [Example] Key: Bob -> some message here
[receiver.example[Bob]] Complete 26ms
[receiver.example[NotBob]] Starting message: [Example] Key: NotBob -> different message
[receiver.example[NotBob]] Arg text = [Example] Key: NotBob -> different message
[receiver.example[NotBob]] Complete 18ms
[receiver.other[Bob]] Skipped

Server:

[flowpipe] Started v0.4.4
[flowpipe] Listening on all network interfaces, port 7103
[flowpipe] Loaded mod.aws_compliance
[trigger.query.detect_and_respond_to_iam_access_keys_older_than_max_days] Disabled
[trigger.query.detect_and_respond_to_s3_buckets_with_versioning_disabled] Disabled
[trigger.query.detect_and_respond_to_ec2_instances_in_unapproved_regions] Disabled
[flowpipe] Press Ctrl+C to exit
exec_col29u24jkdkpb9fccf0 [caller] Starting
exec_col29u24jkdkpb9fccf0 [caller.step_name] Starting pipeline
exec_col29u24jkdkpb9fccf0 [receiver] Starting
exec_col29u24jkdkpb9fccf0 [receiver.key_index] Starting transform
exec_col29u24jkdkpb9fccf0 [receiver.key_index] Complete 1ms
exec_col29u24jkdkpb9fccf0 [receiver.example[Bob]] Starting message: [Example] Key: Bob -> some message here
exec_col29u24jkdkpb9fccf0 [receiver.example[Bob]] Arg text = [Example] Key: Bob -> some message here
exec_col29u24jkdkpb9fccf0 [receiver.example[Bob]] Complete 26ms
exec_col29u24jkdkpb9fccf0 [receiver.example[NotBob]] Starting message: [Example] Key: NotBob -> different message
exec_col29u24jkdkpb9fccf0 [receiver.example[NotBob]] Arg text = [Example] Key: NotBob -> different message
exec_col29u24jkdkpb9fccf0 [receiver.example[NotBob]] Complete 18ms

Flowpipe Version

❯ flowpipe --version
Flowpipe v0.4.4

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions