Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort snapshot listeners on terminate #6136

Open
wants to merge 80 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
830d631
dbToken Persistence
tom-andersen May 8, 2024
37b97be
Copyright
tom-andersen May 8, 2024
d9cf32e
Remove user from globals cache.
tom-andersen May 14, 2024
88919eb
Merge branch 'master' into tomandersen/dbToken
tom-andersen May 14, 2024
a1ff6d6
Merge branch 'master' into tomandersen/dbToken
tom-andersen May 17, 2024
297d2c4
Fix
tom-andersen May 17, 2024
09def37
Merge branch 'master' into tomandersen/dbToken
tom-andersen May 17, 2024
e80b843
Merge branch 'master' into tomandersen/dbToken
tom-andersen May 22, 2024
cb08356
Fix from review comments
tom-andersen May 27, 2024
f79a0a2
Merge branch 'master' into tomandersen/dbToken
tom-andersen May 27, 2024
686e19c
Rename
tom-andersen Jun 5, 2024
b5f66a0
Merge branch 'master' into tomandersen/dbToken
tom-andersen Jun 5, 2024
be8097f
AbstractStream refactor
tom-andersen Jun 5, 2024
b61a395
AbstractStream refactor
tom-andersen Jun 5, 2024
0d25982
Whitespace
tom-andersen Jun 5, 2024
6d0f288
Merge branch 'tomandersen/dbToken' into tomandersen/streamRefactor
tom-andersen Jun 5, 2024
f2c309f
Merge branch 'master' into tomandersen/dbToken
tom-andersen Jun 11, 2024
906acc5
Merge branch 'tomandersen/dbToken' into tomandersen/streamRefactor
tom-andersen Jun 11, 2024
b9f339a
Merge branch 'master' into tomandersen/dbToken
tom-andersen Jun 12, 2024
c65331b
Merge branch 'tomandersen/dbToken' into tomandersen/streamRefactor
tom-andersen Jun 12, 2024
01f424c
Merge branch 'main' into tomandersen/dbToken
tom-andersen Jun 14, 2024
2c4d4d1
Merge branch 'tomandersen/dbToken' into tomandersen/streamRefactor
tom-andersen Jun 14, 2024
3113171
Refactor ComponentProvider
tom-andersen Jun 14, 2024
a3be1e6
Remove dead code
tom-andersen Jun 17, 2024
c61a40c
Fix
tom-andersen Jun 17, 2024
93fd42f
Merge branch 'tomandersen/streamRefactor' into tomandersen/componentP…
tom-andersen Jun 17, 2024
4808d45
Comments and cleanup
tom-andersen Jun 17, 2024
103f775
Merge remote-tracking branch 'origin/tomandersen/componentProviderRef…
tom-andersen Jun 17, 2024
5d8d687
Merge branch 'main' into tomandersen/dbToken
tom-andersen Jun 17, 2024
0beca8a
Merge branch 'tomandersen/dbToken' into tomandersen/streamRefactor
tom-andersen Jun 17, 2024
22a97f3
Merge branch 'tomandersen/streamRefactor' into tomandersen/componentP…
tom-andersen Jun 17, 2024
b749f5c
Whitespace
tom-andersen Jun 17, 2024
89991ef
Merge remote-tracking branch 'origin/tomandersen/componentProviderRef…
tom-andersen Jun 17, 2024
29b2ef9
gRPC integration test of write handshake
tom-andersen Jun 17, 2024
fbc7ad9
PR feedback
tom-andersen Jun 17, 2024
90db84e
Merge branch 'tomandersen/dbToken' into tomandersen/streamRefactor
tom-andersen Jun 17, 2024
2b09c74
Merge branch 'tomandersen/streamRefactor' into tomandersen/componentP…
tom-andersen Jun 17, 2024
6f48bc3
Merge branch 'tomandersen/componentProviderRefactor' into tomandersen…
tom-andersen Jun 17, 2024
3aff190
Merge branch 'main' into tomandersen/streamRefactor
tom-andersen Jun 18, 2024
2699116
Merge branch 'tomandersen/streamRefactor' into tomandersen/componentP…
tom-andersen Jun 19, 2024
1cbaac1
Merge branch 'tomandersen/componentProviderRefactor' into tomandersen…
tom-andersen Jun 19, 2024
5e3f9f5
FirestoreClientProvider
tom-andersen Jun 19, 2024
7a8b052
Fix
tom-andersen Jun 21, 2024
66676d4
Merge remote-tracking branch 'origin/tomandersen/componentProviderRef…
tom-andersen Jun 21, 2024
ec7c414
Merge branch 'tomandersen/componentProviderRefactor' into tomandersen…
tom-andersen Jun 21, 2024
b797dd5
Merge branch 'tomandersen/handshakeTest' into tomandersen/firestoreCl…
tom-andersen Jun 21, 2024
f44a728
Merge remote-tracking branch 'origin/main' into tomandersen/firestore…
tom-andersen Jul 4, 2024
fae9bc0
Fix after merge
tom-andersen Jul 4, 2024
74bce13
Merge remote-tracking branch 'origin/main' into tomandersen/firestore…
tom-andersen Jul 4, 2024
0783dcd
Whitespace
tom-andersen Jul 4, 2024
63e2ec2
Fix
tom-andersen Jul 4, 2024
f2945bf
Merge remote-tracking branch 'origin/main' into tomandersen/firestore…
tom-andersen Jul 17, 2024
44540c4
Pretty
tom-andersen Jul 17, 2024
272cf53
Fix
tom-andersen Jul 17, 2024
1163a7b
Merge branch 'main' into tomandersen/firestoreClientProvider
tom-andersen Jul 18, 2024
285e6fd
Fixes from code review.
tom-andersen Jul 22, 2024
b1c88e2
Merge branch 'main' into tomandersen/firestoreClientProvider
tom-andersen Jul 22, 2024
f989c0e
Remove dead code.
tom-andersen Jul 22, 2024
0d35529
Fix according to review
tom-andersen Jul 22, 2024
1aa54b3
Add comments
tom-andersen Jul 26, 2024
4ea521e
Merge remote-tracking branch 'origin/main' into tomandersen/firestore…
tom-andersen Jul 26, 2024
0637848
Merge branch 'main' into tomandersen/firestoreClientProvider
tom-andersen Jul 29, 2024
a0c6ce2
Merge remote-tracking branch 'origin/main' into tomandersen/abortSnap…
tom-andersen Jul 29, 2024
abbe802
Abort targets on terminate
tom-andersen Jul 30, 2024
f61fade
Merge branch 'main' into tomandersen/firestoreClientProvider
tom-andersen Jul 30, 2024
98745a5
Merge branch 'tomandersen/firestoreClientProvider' into tomandersen/a…
tom-andersen Jul 30, 2024
fee2df9
Fix
tom-andersen Jul 30, 2024
f3a919d
Pretty
tom-andersen Jul 30, 2024
68cd17b
Text
tom-andersen Jul 30, 2024
a82dcd1
Changelog
tom-andersen Jul 30, 2024
00f5a2a
Merge branch 'main' into tomandersen/firestoreClientProvider
tom-andersen Jul 30, 2024
00d6e22
Merge branch 'main' into tomandersen/firestoreClientProvider
tom-andersen Jul 30, 2024
a9f3076
Fix
tom-andersen Jul 30, 2024
dae1f73
Pretty
tom-andersen Jul 30, 2024
c88cf9c
Fix
tom-andersen Jul 30, 2024
70e285d
Fix
tom-andersen Jul 30, 2024
cda37c7
Merge remote-tracking branch 'origin/tomandersen/firestoreClientProvi…
tom-andersen Jul 30, 2024
26bdaae
Fix
tom-andersen Jul 30, 2024
308e688
Merge branch 'main' into tomandersen/abortSnapshotListenersOnTerminate
tom-andersen Aug 12, 2024
36daf54
Merge branch 'main' into tomandersen/abortSnapshotListenersOnTerminate
tom-andersen Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion firebase-firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Unreleased

* [changed] Fail snapshot listeners when Firestore terminates. [#6136](//github.com/firebase/firebase-android-sdk/pull/6136)

# 25.1.0
* [feature] Add support for the VectorValue type. [#6154](//github.com/firebase/firebase-android-sdk/pull/6154)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,8 @@ public void testCanStopListeningAfterTerminate() {

waitFor(instance.terminate());

assertEquals(eventAccumulator.awaitError().getCode(), Code.ABORTED);

// This should proceed without error.
registration.remove();
// Multiple calls should proceed as an effectively no-op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,13 @@ private void activityScopedListenerStopsListeningWhenActivityStops(Activity acti
DocumentReference documentReference = collectionReference.document();

Semaphore events = new Semaphore(0);
collectionReference.addSnapshotListener(
activity,
(value, error) -> {
assertNull(error);
events.release();
});
ListenerRegistration listener =
collectionReference.addSnapshotListener(
activity,
(value, error) -> {
assertNull(error);
events.release();
});

// Initial events
waitFor(events, 1);
Expand All @@ -188,6 +189,8 @@ private void activityScopedListenerStopsListeningWhenActivityStops(Activity acti
// No listeners, therefore, there should be no events.
waitFor(documentReference.set(map("foo", "new-bar")));
assertEquals(0, events.availablePermits());

listener.remove();
}

/** @param activity Must be a TestActivity or a TestFragmentActivity */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testListenUnlistenRelistenSequenceOfMirrorQueries() {

// Unlisten then re-listen limit query.
limitRegistration.remove();
limit.addSnapshotListener(limitAccumulator.listener());
limitRegistration = limit.addSnapshotListener(limitAccumulator.listener());

// Verify `limit` query still works.
data = querySnapshotToValues(limitAccumulator.await());
Expand All @@ -165,13 +165,16 @@ public void testListenUnlistenRelistenSequenceOfMirrorQueries() {
// Unlisten to limitToLast, update a doc, then relisten to limitToLast
limitToLastRegistration.remove();
waitFor(collection.document("a").update(map("k", "a", "sort", -2)));
limitToLast.addSnapshotListener(limitToLastAccumulator.listener());
limitToLastRegistration = limitToLast.addSnapshotListener(limitToLastAccumulator.listener());

// Verify both query get expected result.
data = querySnapshotToValues(limitAccumulator.await());
assertEquals(asList(map("k", "a", "sort", -2L), map("k", "e", "sort", -1L)), data);
data = querySnapshotToValues(limitToLastAccumulator.await());
assertEquals(asList(map("k", "e", "sort", -1L), map("k", "a", "sort", -2L)), data);

limitRegistration.remove();
limitToLastRegistration.remove();
}

@Test
Expand Down Expand Up @@ -507,19 +510,22 @@ public void watchSurvivesNetworkDisconnect() {

Semaphore receivedDocument = new Semaphore(0);

collectionReference.addSnapshotListener(
MetadataChanges.INCLUDE,
(snapshot, error) -> {
if (!snapshot.isEmpty() && !snapshot.getMetadata().isFromCache()) {
receivedDocument.release();
}
});
ListenerRegistration listener =
collectionReference.addSnapshotListener(
MetadataChanges.INCLUDE,
(snapshot, error) -> {
if (!snapshot.isEmpty() && !snapshot.getMetadata().isFromCache()) {
receivedDocument.release();
}
});

waitFor(firestore.disableNetwork());
collectionReference.add(map("foo", FieldValue.serverTimestamp()));
waitFor(firestore.enableNetwork());

waitFor(receivedDocument);

listener.remove();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,17 @@ public void getDocumentWhileOfflineWithDifferentGetOptions() {
// Create an initial listener for this query (to attempt to disrupt the gets below) and wait for
// the listener to deliver its initial snapshot before continuing.
TaskCompletionSource<Void> source = new TaskCompletionSource<>();
docRef.addSnapshotListener(
(docSnap, error) -> {
if (error != null) {
source.setException(error);
} else {
source.setResult(null);
}
});
ListenerRegistration listener =
docRef.addSnapshotListener(
(docSnap, error) -> {
if (error != null) {
source.setException(error);
} else {
source.setResult(null);
}
});
waitFor(source.getTask());
listener.remove();

Task<DocumentSnapshot> docTask = docRef.get(Source.CACHE);
waitFor(docTask);
Expand Down Expand Up @@ -339,15 +341,17 @@ public void getCollectionWhileOfflineWithDifferentGetOptions() {
// Create an initial listener for this query (to attempt to disrupt the gets below) and wait for
// the listener to deliver its initial snapshot before continuing.
TaskCompletionSource<Void> source = new TaskCompletionSource<>();
colRef.addSnapshotListener(
(qrySnap, error) -> {
if (error != null) {
source.setException(error);
} else {
source.setResult(null);
}
});
ListenerRegistration listener =
colRef.addSnapshotListener(
(qrySnap, error) -> {
if (error != null) {
source.setException(error);
} else {
source.setResult(null);
}
});
waitFor(source.getTask());
listener.remove();

Task<QuerySnapshot> qrySnapTask = colRef.get(Source.CACHE);
waitFor(qrySnapTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.firebase.firestore.DocumentSnapshot;
import com.google.firebase.firestore.EventListener;
import com.google.firebase.firestore.FirebaseFirestoreException;
import com.google.firebase.firestore.QuerySnapshot;
import com.google.firebase.firestore.util.Logger;
import java.util.ArrayList;
Expand All @@ -30,28 +31,45 @@
public class EventAccumulator<T> {
private static final int MAX_EVENTS = 10;

private final BlockingQueue<T> events;
private final BlockingQueue<Object> events;
private boolean rejectAdditionalEvents;

public EventAccumulator() {
events = new ArrayBlockingQueue<T>(MAX_EVENTS);
events = new ArrayBlockingQueue<>(MAX_EVENTS);
}

public EventListener<T> listener() {
return (value, error) -> {
hardAssert(error == null, "Unexpected error: %s", error);
hardAssert(
!rejectAdditionalEvents, "Received event after `assertNoAdditionalEvents()` was called");
Logger.debug("EventAccumulator", "Received new event: " + value);
events.offer(value);
if (error == null) {
Logger.debug("EventAccumulator", "Received new event: " + value);
events.add(value);
} else {
Logger.debug("EventAccumulator", "Received error: " + error);
events.add(error);
}
};
}

public FirebaseFirestoreException awaitError() {
try {
return (FirebaseFirestoreException) events.take();
} catch (Exception e) {
Logger.debug("EventAccumulator", e.toString());
throw fail("Failed to receive error");
}
}

public List<T> await(int numEvents) {
try {
List<T> result = new ArrayList<>(numEvents);
for (int i = 0; i < numEvents; ++i) {
result.add(events.take());
Object event = events.take();
if (event instanceof FirebaseFirestoreException) {
fail("Unexpected error: %s", event);
}
result.add((T) event);
}
return result;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,8 @@ public Task<Void> runBatch(@NonNull WriteBatch.Function batchFunction) {
* <p>To restart after termination, simply create a new instance of {@code FirebaseFirestore} with
* {@link #getInstance()} or {@link #getInstance(FirebaseApp)}.
*
* <p>{@code terminate()} does not cancel any pending writes and any tasks that are awaiting a
* response from the server will not be resolved. The next time you start this instance, it will
* <p>{@code terminate()} does not cancel any pending writes but any write tasks that are awaiting
* a response from the server will not be resolved. The next time you start this instance, it will
* resume attempting to send these writes to the server.
*
* <p>Note: Under normal circumstances, calling {@code terminate()} is not required. This method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static com.google.firebase.firestore.util.Assert.hardAssert;

import com.google.firebase.firestore.EventListener;
import com.google.firebase.firestore.FirebaseFirestoreException;
import com.google.firebase.firestore.ListenSource;
import com.google.firebase.firestore.core.SyncEngine.SyncEngineCallback;
import com.google.firebase.firestore.util.Util;
Expand Down Expand Up @@ -265,4 +266,14 @@ public void handleOnlineStateChange(OnlineState onlineState) {
raiseSnapshotsInSyncEvent();
}
}

public void abortAllTargets() {
FirebaseFirestoreException error = Util.exceptionFromStatus(Status.ABORTED);
for (QueryListenersInfo info : queries.values()) {
for (QueryListener listener : info.listeners) {
listener.onError(error);
}
}
queries.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public Task<Void> enableNetwork() {
public Task<Void> terminate() {
authProvider.removeChangeListener();
appCheckProvider.removeChangeListener();
asyncQueue.enqueueAndForgetEvenAfterShutdown(() -> eventManager.abortAllTargets());
return asyncQueue.enqueueAndInitiateShutdown(
() -> {
remoteStore.shutdown();
Expand Down
Loading