ETS tables on steroids!
Shards is an Erlang/Elixir library/tool compatible with the ETS API, that implements Sharding/Partitioning support on top of ETS totally transparent and out-of-box. Shards might be probably the simplest option to scale-out ETS tables.
Additional documentation on cabol.github.io.
Why we might need Sharding on ETS tables? Well, the main reason is to keep the lock contention under control, in order to scale-out ETS tables (linearly) and support higher levels of concurrency without lock issues (specially write-locks) – which most of the cases might cause significant performance degradation.
Therefore, one of the most common and proven strategies to deal with these problems is Sharding/Partitioning – the principle is pretty similar to DHTs.
Here is where Shards comes in. Shards makes extremely easy achieve all this, with zero effort. It provides an API compatible with ETS – with few exceptions. You can check the list of compatible ETS functions that Shards provides HERE.
In your rebar.config
:
{deps, [
{shards, "0.4.1"}
]}.
In your mix.exs
:
def deps do
[{:shards, "~> 0.4"}]
end
$ git clone https://github.com/cabol/shards.git
$ cd shards
$ make
NOTE:
shards
comes with a helper Makefile but it's a simple wrapper on top ofrebar3
, therefore, you can do everything usingrebar3
directly as well.
Now you can start an Erlang console with shards
running:
$ make shell
% let's create a table, such as you would create it with ETS, with 4 shards
> shards:new(mytab1, [{n_shards, 4}]).
mytab1
Exactly as ETS, shards:new/2
function receives 2 arguments, the name of the table and
the options. With shards
there are additional options:
Option | Description | Default |
---|---|---|
{n_shards, pos_integer()} |
Allows to set the desired number of shards. | By default, the number of shards is calculated from the total online schedulers: erlang:system_info(schedulers_online) |
`{scope, l | g}` | Defines shards scope, in other words, if sharding will be applied locally (l ) or global/distributed (g ) |
`{restart_strategy, one_for_one | one_for_all}` | Allows to configure the restart strategy for shards_owner_sup . |
{pick_shard_fun, pick_fun()} |
Function to pick the shard on which the key will be handled locally – used by shards_local . See shards_state. |
shards_local:pick/3 |
{pick_node_fun, pick_fun()} |
Function to pick the node on which the key will be handled globally/distributed – used by shards_dist . See shards_state. |
shards_local:pick/3 |
{nodes, [node()]} |
Allows to set a list of nodes to auto setup a distributed table – the table is created in all given nodes and then all nodes are joined. This option only has effect if the option {scope, g} has been set. |
[] |
NOTE: By default
shards
uses a built-in functions to pick the shard (local scope) and the node (distributed scope) on which the key will be handled. BUT you can override them and set your own functions, they are totally configurable by table, so you can have different tables with different pick-functions each.
Furthermore, when a new table is created, the State is created for that table as well. The reason of the State is to store information related to that table, such as: number of shards, pick functions, etc. For more information go to the State Section.
You can get the State at any time you want:
> shards_state:get(mytab1).
{state,shards_local,4,#Fun<shards_local.pick.3>,
#Fun<shards_local.pick.3>}
% this is a wrapper on top of shards_state:get/1
> shards:state(mytab1).
{state,shards_local,4,#Fun<shards_local.pick.3>,
#Fun<shards_local.pick.3>}
NOTE: For more information about
shards:new/2
see shards.
Let's continue:
% create another one with default number of shards, which is the total of online
% schedulers; in my case is 8 (4 cores, 2 threads each).
% This value is calculated calling: erlang:system_info(schedulers_online)
> shards:new(mytab2, []).
mytab2
% now open the observer so you can see what happened
> observer:start().
ok
You will see the process tree of shards
application. When you create a new "table", what happens behind
is: shards
creates a supervision tree dedicated only to that group of shards that will represent
your logical table in multiple physical ETS tables, and everything is handled auto-magically by shards
,
you only have to use the API like if you were working with a common ETS table.
Now let's execute some write/read operations against the created shards
:
% inserting some objects
> shards:insert(mytab1, [{k1, 1}, {k2, 2}, {k3, 3}]).
true
% let's check those objects
> shards:lookup(mytab1, k1).
[{k1,1}]
> shards:lookup(mytab1, k2).
[{k2,2}]
> shards:lookup(mytab1, k3).
[{k3,3}]
> shards:lookup(mytab1, k4).
[]
% delete an object and then check
> shards:delete(mytab1, k3).
true
> shards:lookup(mytab1, k3).
[]
% now let's find all stored objects using select
> MatchSpec = ets:fun2ms(fun({K, V}) -> {K, V} end).
[{{'$1','$2'},[],[{{'$1','$2'}}]}]
> shards:select(mytab1, MatchSpec).
[{k1,1},{k2,2}]
As you may have noticed, it's extremely easy, because almost all ETS functions are
implemented by shards, it's only matters of replace ets
module by shards
.
Shards behaves in elastic way, as you saw, more shards can be added/removed dynamically:
> shards:delete(mytab1).
true
> observer:start().
ok
See how shards
gets shrinks.
In the previous section we saw something about the state
, how it can be fetched at any time.
But, what is the state
?
There are different properties that have to be stored somewhere in order shards
works
correctly. Remember, shards
has a logic on top of ETS
, for example, compute the
shard/node where the key/value pair goes, and to do that, it needs the number of shards,
the function to pick the shard or node (in case of global scope), the table type and
of course, the module to use depending on the scope (shards_local
or shards_dist
).
Because of that, when a new table is created using shards
, a new supervision tree
is created as well to represent that table. The supervisor is shards_owner_sup
and
it has a control ETS table to save the state
so it can be fetched later at any time.
The shards
state is defined as:
-record(state, {
module = shards_local :: module(),
n_shards = ?N_SHARDS :: pos_integer(),
pick_shard_fun = fun shards_local:pick/3 :: pick_fun(),
pick_node_fun = fun shards_local:pick/3 :: pick_fun()
}).
But this record is totally transparent for you, shards
provides a dedicated module
to handle the state
: shards_state. With this utility module,
you can fetch the state, get any property and also other functions.
The module shards
is a wrapper on top of two main modules:
- shards_local: Implements Sharding on top of ETS tables, but locally (on a single Erlang node).
- shards_dist: Implements Sharding but across multiple distributed Erlang nodes, which must
run
shards
locally, sinceshards_dist
usesshards_local
internally. We'll cover the distributed part later.
When you use shards
on top of shards_local
, a call to the control ETS table owned by shards_owner_sup
must be done, in order to recover the State, mentioned previously.
Most of the shards_local
functions receives the State as parameter, so it must be fetched before
to call it. You can check how shards
module is implemented HERE.
If any microsecond matters to you, you can skip the call to the control ETS table by calling
shards_local
directly. We have two options:
-
The first option is getting the
state
, and passing it as argument. Now the question is: how to get the State? Well, it's extremely easy, you can get thestate
when you callshards:new/2
by first time, or you can callshards:state/1
,shards_state:get/1
orshards_state:new/0,1,2,3,4
at any time you want, and then it might be stored within the calling process, or wherever you want. E.g.:% take a look at the 2nd element of the returned tuple, that is the state > shards:new(mytab, [{n_shards, 4}]). mytab % remember you can get the state at any time you want > State = shards:state(mytab). {state,shards_local,4,#Fun<shards_local.pick.3>, #Fun<shards_local.pick.3>} % now you can call shards_local directly > shards_local:insert(mytab, {1, 1}, State). true > shards_local:lookup(mytab, 1, State). [{1,1}] % in this case, only the n_shards is different from default, so you % can do this: > shards_local:lookup(mytab, 1, shards_state:new(4)). [{1,1}]
-
The 2nd option is to call
shards_local
directly without thestate
, but this is only possible if you have created a table with defaultshards
options – such asn_shards
,pick_shard_fun
andpick_node_fun
. If you can take this option it might be significantly better, since in this case no additional calls are needed, not even to recover thestate
(like in the previous option), because a newstate
is created with default values. Therefore, the call is mapped directly to an ETS function. E.g.:% create a table without set n_shards, pick_shard_fun or pick_node_fun > shards:new(mytab, []). mytab % call shards_local without the state > shards_local:insert(mytab, {1, 1}). true > shards_local:lookup(mytab, 1). [{1,1}]
Most of the cases this is not necessary, shards
wrapper is more than enough, it adds only a
few microseconds of latency. In conclusion, Shards gives you the flexibility to do it,
but it's your call!
So far, we've seen how Shards works but locally, now let's see how Shards works but distributed.
1. Let's start 3 Erlang consoles running shards:
Node a
:
$ rebar3 shell --sname a@localhost
Node b
:
$ rebar3 shell --sname b@localhost
Node c
:
$ rebar3 shell --sname c@localhost
2. Create a table with global scope ({scope, g}
) on each node and then join them.
There are two ways to achieve this:
-
Manually, create the table on each node and then from any of them, join the rest.
Create the table on each node:
% when a tables is created with {scope, g}, the module shards_dist is used % internally by shards > shards:new(mytab, [{scope, g}]). mytab
Join them. From node
a
, joinb
andc
nodes:> shards:join(mytab, ['b@localhost', 'c@localhost']). [a@localhost,b@localhost,c@localhost]
Let's check that all nodes have the same nodes running next function on each node:
> shards:get_nodes(mytab). [a@localhost,b@localhost,c@localhost]
-
The easier way, call
shards:new/3
but passing the option{nodes, Nodes}
, whereNodes
is the list of nodes you want to join.From Node
a
:> shards:new(mytab, [{scope, g}, {nodes, ['b@localhost', 'c@localhost']}]). mytab
Let's check again on all nodes:
> shards:get_nodes(mytab). [a@localhost,b@localhost,c@localhost]
3. Now Shards cluster is ready, let's do some basic operations:
From node a
:
> shards:insert(mytab, [{k1, 1}, {k2, 2}]).
true
From node b
:
> shards:insert(mytab, [{k3, 3}, {k4, 4}]).
true
From node c
:
> shards:insert(mytab, [{k5, 5}, {k6, 6}]).
true
Now, from any of previous nodes:
> [shards:lookup_element(mytab, Key, 2) || Key <- [k1, k2, k3, k4, k5, k6]].
[1,2,3,4,5,6]
All nodes should return the same result.
Let's do some deletions, from any node:
> shards:delete(mytab, k6).
true
And again, let's check it out from any node:
% as you can see 'k6' was deleted
> shards:lookup(mytab, k6).
[]
% check remaining values
> [shards:lookup_element(mytab, Key, 2) || Key <- [k1, k2, k3, k4, k5]].
[1,2,3,4,5]
NOTE: This module is still under continuous development. So far, only few basic functions have been implemented.
-
ExShards is an Elixir wrapper for
shards
– with extra and nicer functions. -
KVX – Simple/basic Elixir in-memory Key/Value Store using
shards
(default adapter). -
ErlBus uses
shards
to scale-out Topics/Pids table(s), which can be too large and with high concurrency level. -
Cacherl uses
shards
to implement a Distributed Cache.
$ make test
You can find tests results in _build/test/logs
, and coverage in _build/test/cover
.
NOTE: You can find some performance tests in this BLOG POST.
$ make edoc
Note: Once you run previous command, a new folder
doc
is created, and you'll have a pretty nice HTML documentation.
Copyright (c) 2016 Carlos Andres Bolaños R.A.
Shards source code is licensed under the MIT License.