Closed
Description
Context
I can replicate a pipeline execution freeze under a very specific set of circumstances; this one was hard to identify the circumstances that caused it but after a bit of trial & error I was able to isolate the flow.
When a pipeline
receives a collection and we then attempt to use an if/for_each/max_concurrency=1 on two options where the last listed option is a pipeline
that would be skipped based on the if
statement; the pipeline execution will halt after skipping the first item in the for_each.
Note: This may require running against a server
Pipeline to reproduce
locals {
use_message = "yes"
}
pipeline "caller" {
step "pipeline" "step_name" {
pipeline = pipeline.receiver
args = {
a = [
{
name = "Bob"
message = "some message here"
},
{
name = "NotBob"
message = "different message"
}
]
}
}
}
pipeline "receiver" {
param "a" {
type = any
}
step "transform" "key_index" {
value = {for item in param.a : item.name => item}
}
step "message" "example" {
if = local.use_message == "yes"
for_each = step.transform.key_index.value
notifier = notifier["default"]
text = "[Example] Key: ${each.key} -> ${each.value.message}"
}
step "pipeline" "other" {
if = local.use_message != "yes"
for_each = step.transform.key_index.value
max_concurrency = 1
pipeline = pipeline.receiver2
args = {
item = each.value
}
}
}
pipeline "receiver2" {
param "item" {
type = any
}
step "message" "example" {
notifier = notifier["default"]
text = "[Other] ${param.item.name}"
}
}
Result of pipeline
Client:
❯ flowpipe pipeline run caller --host local
[flowpipe] Execution ID: exec_col29u24jkdkpb9fccf0
[caller] Starting pipeline
[caller.step_name] Starting pipeline
[receiver] Starting pipeline
[receiver.key_index] Starting transform
[receiver.key_index] Complete 1ms
[receiver.example[Bob]] Starting message: [Example] Key: Bob -> some message here
[receiver.example[Bob]] Arg text = [Example] Key: Bob -> some message here
[receiver.example[Bob]] Complete 26ms
[receiver.example[NotBob]] Starting message: [Example] Key: NotBob -> different message
[receiver.example[NotBob]] Arg text = [Example] Key: NotBob -> different message
[receiver.example[NotBob]] Complete 18ms
[receiver.other[Bob]] Skipped
Server:
[flowpipe] Started v0.4.4
[flowpipe] Listening on all network interfaces, port 7103
[flowpipe] Loaded mod.aws_compliance
[trigger.query.detect_and_respond_to_iam_access_keys_older_than_max_days] Disabled
[trigger.query.detect_and_respond_to_s3_buckets_with_versioning_disabled] Disabled
[trigger.query.detect_and_respond_to_ec2_instances_in_unapproved_regions] Disabled
[flowpipe] Press Ctrl+C to exit
exec_col29u24jkdkpb9fccf0 [caller] Starting
exec_col29u24jkdkpb9fccf0 [caller.step_name] Starting pipeline
exec_col29u24jkdkpb9fccf0 [receiver] Starting
exec_col29u24jkdkpb9fccf0 [receiver.key_index] Starting transform
exec_col29u24jkdkpb9fccf0 [receiver.key_index] Complete 1ms
exec_col29u24jkdkpb9fccf0 [receiver.example[Bob]] Starting message: [Example] Key: Bob -> some message here
exec_col29u24jkdkpb9fccf0 [receiver.example[Bob]] Arg text = [Example] Key: Bob -> some message here
exec_col29u24jkdkpb9fccf0 [receiver.example[Bob]] Complete 26ms
exec_col29u24jkdkpb9fccf0 [receiver.example[NotBob]] Starting message: [Example] Key: NotBob -> different message
exec_col29u24jkdkpb9fccf0 [receiver.example[NotBob]] Arg text = [Example] Key: NotBob -> different message
exec_col29u24jkdkpb9fccf0 [receiver.example[NotBob]] Complete 18ms
Flowpipe Version
❯ flowpipe --version
Flowpipe v0.4.4
Activity