Skip to content

Commit

Permalink
DynamoStore: Handle removal of "a" fields from calves (#222)
Browse files Browse the repository at this point in the history
* Handle removal of "a" fields from calves
* Target Eqx 4.0.0-rc.12
  • Loading branch information
bartelink authored Jul 5, 2023
1 parent 1b4221b commit ed27380
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.11.2" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
9 changes: 7 additions & 2 deletions src/Propulsion.DynamoStore.Indexer/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : Amazon.Lambda.DynamoDBE
| ot when ot = OperationType.INSERT || ot = OperationType.MODIFY ->
let p = record.Dynamodb.Keys["p"].S
let sn, n = IndexStreamId.ofP p, int64 updated["n"].N
let appendedLen = int updated["a"].N
if p.StartsWith AppendsEpoch.Category || p.StartsWith AppendsIndex.Category then indexStream <- indexStream + 1
elif p.StartsWith '$' then systemStreams <- systemStreams + 1
elif appendedLen = 0 then noEvents <- noEvents + 1
else
// Equinox writes all enter via the Tip. The "a" field of the tip indicates how many events were pushed in this insert/update
// Calf entries will not have a "a" field (pre-release versions of DynamoStore previously wrote directly to the calf)
let appendedLen = match updated.TryGetValue "a" with true, v -> int v.N | false, _ -> 0
// If nothing was written (e.g. the write was transmuted to a snapshot update only)
// then the count may be 0, in which case skip parse
if appendedLen = 0 then noEvents <- noEvents + 1 else

let allBatchEventTypes = [| for x in updated["c"].L -> x.S |]
match allBatchEventTypes |> Array.skip (allBatchEventTypes.Length - appendedLen) with
| [||] -> ()
Expand Down
24 changes: 14 additions & 10 deletions src/Propulsion.DynamoStore.Notifier/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : DynamoDBEvent) : KeyVal
let p = record.Dynamodb.Keys["p"].S
match FsCodec.StreamName.parse p with
| AppendsEpoch.StreamName (partitionId, epochId) ->
match int64 updated["a"].N with
| 0L -> noEvents <- noEvents + 1
| appendedLen ->
let n = int64 updated["n"].N
let i = n - appendedLen
summary.Append(partitionId).Append('/').Append(epochId).Append(' ').Append(appendedLen).Append('@').Append(i) |> ignore
let eventTypes = updated["c"].L
let isClosed = eventTypes[eventTypes.Count - 1].S |> AppendsEpoch.Events.isEventTypeClosed
let checkpoint = Checkpoint.positionOfEpochClosedAndVersion epochId isClosed n
updateTails partitionId checkpoint
// Calf writes won't have an "a" field
let appendedLen = match updated.TryGetValue "a" with true, v -> int64 v.N | false, _ -> 0
// Tip writes may not actually have added events, if the sync was transmuted to an update of the unfolds only
// In such cases, we would not want to trigger a downstream write
// (This should not actually manifest for AppendsEpoch streams; mentioning for completeness)
if appendedLen = 0 then noEvents <- noEvents + 1 else

let n = int64 updated["n"].N
let i = n - appendedLen
summary.Append(partitionId).Append('/').Append(epochId).Append(' ').Append(appendedLen).Append('@').Append(i) |> ignore
let eventTypes = updated["c"].L
let isClosed = eventTypes[eventTypes.Count - 1].S |> AppendsEpoch.Events.isEventTypeClosed
let checkpoint = Checkpoint.positionOfEpochClosedAndVersion epochId isClosed n
updateTails partitionId checkpoint
| _ ->
if p.StartsWith AppendsIndex.Category then indexStream <- indexStream + 1
else otherStream <- otherStream + 1
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.11.2" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.11.2" />
<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.12" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.11.2" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.12" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.11.2" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.12" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit ed27380

Please sign in to comment.