Skip to content

antonmi/ALF

Repository files navigation

ALF

Hex.pm

Flow-based Application Layer Framework

ALF is a set of flow-control abstractions built on top Elixir GenStage which allows writing programs following the Flow-Based Programming (FBP) approach.

ALF is a framework for your application layer, it provides a simple and expressive way of presenting the logic as sequential processing of "information packets" (IPs) (or, simply, messages or events), and thus brings high design-, coding-, and run-time observability.

ALF is NOT a general-purpose "language", so implementing a complex domain logic with it might be questionable (although it is possible).

ALF is a successor of the Flowex project. Check it's README to get the general idea. ALF adds conditional branching, packet cloning, goto statement and other functionalities. Therefore, one can create application trees (graphs) of arbitrary complexity.

Something to read and watch

Flow-Based Programming with Elixir and ALF - Code BEAM 2022

ALF — Flow-based Application Layer Framework - post

Where is Application Layer - post

ALF - Flow-based Application Layer Framework - Sydney Elixir Meetup

Broadway, Flow?

What's the difference between ALF and Broadway or Flow?

The short answer is: Broadway and Flow are tools for processing streams of data while ALF is a framework for writing general business logic.

The three libraries are build on top of the GenStage library, so they are similar in a sense that there are GenStages in which you can put your own code. But the focus is completely different.

Flow focuses on "computations on collections, similar to the Enum and Stream modules", it's a quite low-level tool for processing large collections of data.

Broadway is about "data ingestion and data processing pipelines". The main abstraction are "data processors", there are lots of adapters to different data sources and so on.

ALF is NOT about data-processing (although you can easily do it with ALF). It's about a FBP-way to build your application layer logic.

Installation

Just add :alf as dependency to your mix.exs file.

  defp deps do
    [
      {:alf, "~> 0.11"}
    ]
  end

ALF starts its own supervisor (ALF.DynamicSupervisor). All the pipelines and managers are started under the supervisor

Important notice!

In the version "0.8" the interface was changed significantly.

And now all the interactions go through the pipeline module itself.

# start and stop
MyPipeline.start() # instead of ALF.Manager.start(MyPipeline)
MyPipeline.stop()  # instead of ALF.Manager.stop(MyPipeline) 

# send events
MyPipeline.call(event, opts \\ []) # new
MyPipeline.stream(enumerable, opts \\ []) # instead of ALF.Manager.stream_to(enumerable, MyPipeline)
MyPipeline.cast(event, opts \\ []) # new

Quick start

Read a couple of sections of Flowex README to get the basic idea of how your code is put to GenStages.

Define your pipeline

A pipeline is a list of components defined in the @components module variable.

defmodule ThePipeline do
  use ALF.DSL

  @components [
    stage(:add_one),
    stage(:mult_by_two),
    stage(:minus_three)
  ]

  def add_one(event, _opts), do: event + 1
  def mult_by_two(event, _opts), do: event * 2
  def minus_three(event, _opts), do: event - 3
end

Start the pipeline

:ok = ThePipeline.start()

This starts a manager (GenServer) with the ThePipeline name. The manager starts all the components and puts them under another supervision tree.

alt text

Use the pipeline

There are several ways you can run your pipeline. There are call/2, cast/2 and stream/2 functions.

call/2 calls the pipeline and blocks the caller process until the result is returned.

ThePipeline.call(1) # returns 1
ThePipeline.call(2, debug: true) # it returns %ALP.IP{} struct

cast/2 sends event to the pipeline and returns the IP reference immediately.

ThePipeline.cast(1) # returns reference like #Reference<0.3669068923.1709703170.126245>

One can actually receive the result of the cast/2 back with send_result: true option.

ref = ThePipeline.cast(1, send_result: true)
receive do
  {^ref, %ALF.IP{event: event}} -> 
    event
end

stream/2 is a bit different. It receives a stream or Enumerable.t and returns another stream where results will be streamed.

inputs = [1,2,3]
output_stream = ThePipeline.stream(inputs)
Enum.to_list(output_stream) # it returns [1, 3, 5]

The debug: true option also works for streams

Parallel processing of several streams

The ALF pipeline can handle arbitrary amount of events streams in parallel. For example:

 stream1 = ThePipeline.stream(0..9)
 stream2 = ThePipeline.stream(10..19)
 stream3 = ThePipeline.stream(20..29)

 [result1, result2, result3] =
   [stream1, stream2, stream3]
   |> Enum.map(&Task.async(fn -> Enum.to_list(&1) end))
   |> Task.await_many()

Check test/examples folder for more examples

Synchronous evaluation

There are cases when you don't need the underlying gen_stage infrastructure (a separate process for each component). E.g. in tests, or if you debug a wierd error. There is a possibility to run a pipeline synchronously, when everything is run in one process. Just pass sync: true option to the start function.

:ok = ThePipeline.start(sync: true)

There are only call/2 and stream/2 functions available in this mode, no cast/2.

The main idea behind ALF DSL

User's code that is evaluated inside components must be defined either as a 2-arity function or as a module with the call/2 function. The name of the function/module goes as a first argument in DSL. And the name also become the component's name.

  stage(:my_fun)
  # or
  stage(MyComponent)

where my_fun is

def my_fun(event, opts) do
  #logic is here
  new_event
end

and MyComponent is

defmodule MyComponent do
  # optional
  def init(opts), do: %{opts | foo: :bar}

  def call(event, opts) do
    # logic is here
    new_event
  end
end

Most of the components accept the opts argument, the options will be passed as a second argument to the corresponding function.

  stage(MyComponent, opts: [foo: :bar])

Check @dsl_options in lib/components for available options.

Components overview

ALF components

Stage

Stage is the stateless component where one puts a piece of application logic. It might be a simple 2-arity function or a module with call/2 function:

  stage(:my_fun, opts: %{foo: bar})
  # or
  stage(MyComponent, opts: %{})

where MyComponent is

defmodule MyComponent do
  # optional
  def init(opts), do: %{opts | foo: :bar}

  def call(event, opts) do
    # logic is here
    new_datum
  end
end

There is the :count option that allows running several copies of a stage.

  stage(:my_fun, count: 5)

Use it for controlling parallelism.

Composer

Composer is a stateful component, the state of the composer ("memo") can be updated on each event.

Think about Enum.reduce/3, where one can set the initial value of accumulator and then return a new value after each iteration.

composer(:sum, memo: 0)

The implementation function is a 3-arity function with the "memo" as the second argument.

The function must return a {list(event), new_memo} tuple.

def sum(event, memo, _opts) do
  new_memo = memo + event
  {[event], new_memo} 
end

The first element in the returned value is a list of events, meaning that a composer can produce many events or no events at all.

Both, having memory and the ability to return several events give the composer component a huge power.

See, for example, the telegram_test.exs example which solves the famous "Telegram Problem".

Composer vs Stage

One may have noticed that the stage component is actually a special case of the composer - no memory, only one event is returned.

However, I would keep them as separate components in order to explicitly separate stateless and stateful transformation in a pipeline.

Switch

Switch allows to forward IP (information packets) to different branches:

switch(:my_switch_function,
        branches: %{
          part1: [stage(:foo)],
          part2: [stage(:bar)]
        },
        opts: [foo: :bar]
      )
# or with module
switch(MySwitchModule, ...)

The my_switch_function function is 2-arity function that must return the key of the branch:

def my_switch_function(event, opts) do
  if event == opts[:foo], do: :part1, else: :part2
end

# or

defmodule MySwitchModule do
  # optional
  def init(opts), do: %{opts | baz: :qux}

  def call(event, opts) do
    if event == opts[:foo], do: :part1, else: :part2
  end
end

Goto

Send packet to a given goto_point

goto(:my_goto_function, to: :my_goto_point, opts: [foo: :bar])
# or
goto(MyGotoModule, to: :my_goto_point, opts: [foo: :bar])

The function function is 2-arity function that must return true of false

def my_goto_function(event, opts) do
  event == opts[:foo]
end

GotoPoint

The Goto component companion

goto_point(:goto_point)

Done

If the condition_fun returns truthy value, the event will go directly to the consumer. It allows to exit early if the work is already done or when the controlled error occurred in execution flow. The same behavior can be also implemented using "switch" or "goto", however the "done" component is much simpler.

done(:condition_fun)

DeadEnd

Event won't propagate further.

dead_end(:dead_end)

Implicit components

Implicit components

Producer and Consumer

Nothing special to know, these are internal components that put at the beginning and at the end of your pipeline.

Plug and Unplug

Plug and Unplug are used for transforming events before and after reusable parts of a pipeline. The components can not be used directly and are generated automatically when one use plug_with macro. See below.

Components / Pipeline reusing

from macro

One can easily include components from another pipeline:

defmodule ReusablePipeline do
  use ALF.DSL
  @components [
    stage(:foo),
    stage(:bar)
  ]
end

defmodule ThePipeline do
  use ALF.DSL
  @components from(ReusablePipeline) ++ [stage(:baz)]
end

plug_with macro

Use the macro if you include other components that expect different type/format/structure of input events.

defmodule ThePipeline do
  use ALF.DSL

  @components [
                plug_with(AdapterModuleBaz, do: [stage(:foo), stage(:bar)])
              ] ++
                plug_with(AdapterModuleForReusablePipeline) do
                  from(ReusablePipeline)
                end

end

plug_with adds Plug component before the components in the block and Unplug at the end. The first argument is an "adapter" module which must implement the plug/2 and unplug/3 functions

def AdapterModuleBaz do
  def init(opts), do: opts # optional

  def plug(event, _opts) do
    # The function is called inside the `Plug` component.
    # `event` will be put on the "AdapterModuleBaz" until IP has reached the "unplug" component.
    # The function must return `new_event` with the structure expected by the following component
    new_event
  end

  def unplug(event, prev_event, _opts) do
    # here one can access previous "event" in `prev_event`
    # transform the event back for the following components.
    new_event
  end
end

Diagrams

The amazing thing with the FBP approach is that one can easily visualize the application logic. Below there are several ALF-diagrams for the examples in test/examples.

Bubble sort

Bubble sort

Bubble sort with Switch

Bubble sort with Switch

Tic-tac-toe

See tictactoe repo Tic-Tac-Toe

Releases

No releases published

Packages

No packages published

Languages