Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoStore: Handle removal of "a" fields from calves #222

Merged
merged 3 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Handle removal of "a" fields from calves
  • Loading branch information
bartelink committed Jun 29, 2023
commit 354d6a3dd17caeb38229e7034ea73aecd8554723
9 changes: 7 additions & 2 deletions src/Propulsion.DynamoStore.Indexer/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : Amazon.Lambda.DynamoDBE
| ot when ot = OperationType.INSERT || ot = OperationType.MODIFY ->
let p = record.Dynamodb.Keys["p"].S
let sn, n = IndexStreamId.ofP p, int64 updated["n"].N
let appendedLen = int updated["a"].N
if p.StartsWith AppendsEpoch.Category || p.StartsWith AppendsIndex.Category then indexStream <- indexStream + 1
elif p.StartsWith '$' then systemStreams <- systemStreams + 1
elif appendedLen = 0 then noEvents <- noEvents + 1
else
// Equinox writes all enter via the Tip. The "a" field of the tip indicates how many events were pushed in this insert/update
// Calf entries will not have a "a" field (pre-release versions of DynamoStore previously wrote directly to the calf)
let appendedLen = match updated.TryGetValue "a" with true, v -> int v.N | false, _ -> 0
// If nothing was written (e.g. the write was transmuted to a snapshot update only)
// then the count may be 0, in which case skip parse
if appendedLen = 0 then noEvents <- noEvents + 1 else

let allBatchEventTypes = [| for x in updated["c"].L -> x.S |]
match allBatchEventTypes |> Array.skip (allBatchEventTypes.Length - appendedLen) with
| [||] -> ()
Expand Down
24 changes: 14 additions & 10 deletions src/Propulsion.DynamoStore.Notifier/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : DynamoDBEvent) : KeyVal
let p = record.Dynamodb.Keys["p"].S
match FsCodec.StreamName.parse p with
| AppendsEpoch.StreamName (partitionId, epochId) ->
match int64 updated["a"].N with
| 0L -> noEvents <- noEvents + 1
| appendedLen ->
let n = int64 updated["n"].N
let i = n - appendedLen
summary.Append(partitionId).Append('/').Append(epochId).Append(' ').Append(appendedLen).Append('@').Append(i) |> ignore
let eventTypes = updated["c"].L
let isClosed = eventTypes[eventTypes.Count - 1].S |> AppendsEpoch.Events.isEventTypeClosed
let checkpoint = Checkpoint.positionOfEpochClosedAndVersion epochId isClosed n
updateTails partitionId checkpoint
// Calf writes won't have an "a" field
let appendedLen = match updated.TryGetValue "a" with true, v -> int64 v.N | false, _ -> 0
// Tip writes may not actually have added events, if the sync was transmuted to an update of the unfolds only
// In such cases, we would not want to trigger a downstream write
// (This should not actually manifest for AppendsEpoch streams; mentioning for completeness)
if appendedLen = 0 then noEvents <- noEvents + 1 else

let n = int64 updated["n"].N
let i = n - appendedLen
summary.Append(partitionId).Append('/').Append(epochId).Append(' ').Append(appendedLen).Append('@').Append(i) |> ignore
let eventTypes = updated["c"].L
let isClosed = eventTypes[eventTypes.Count - 1].S |> AppendsEpoch.Events.isEventTypeClosed
let checkpoint = Checkpoint.positionOfEpochClosedAndVersion epochId isClosed n
updateTails partitionId checkpoint
| _ ->
if p.StartsWith AppendsIndex.Category then indexStream <- indexStream + 1
else otherStream <- otherStream + 1
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.11.2" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-pr.401.rc.11.3" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down