Skip to content

Commit c2dfbab

Browse files
committed
fix(pool.py): make _replace operation thread safe
1 parent 51c016f commit c2dfbab

File tree

1 file changed

+17
-17
lines changed

1 file changed

+17
-17
lines changed

cassandra/pool.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -577,24 +577,24 @@ def _replace(self, connection):
577577
if self.is_shutdown:
578578
return
579579

580-
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
581-
try:
582-
if connection.shard_id in self._connections.keys():
583-
del self._connections[connection.shard_id]
584-
if self.host.sharding_info:
585-
self._connecting.add(connection.shard_id)
586-
self._session.submit(self._open_connection_to_missing_shard, connection.shard_id)
580+
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
581+
try:
582+
if connection.shard_id in self._connections.keys():
583+
del self._connections[connection.shard_id]
584+
if self.host.sharding_info:
585+
self._connecting.add(connection.shard_id)
586+
self._session.submit(self._open_connection_to_missing_shard, connection.shard_id)
587+
else:
588+
connection = self._session.cluster.connection_factory(self.host.endpoint, owning_pool=self)
589+
if self._keyspace:
590+
connection.set_keyspace_blocking(self._keyspace)
591+
self._connections[connection.shard_id] = connection
592+
except Exception:
593+
log.warning("Failed reconnecting %s. Retrying." % (self.host.endpoint,))
594+
self._session.submit(self._replace, connection)
587595
else:
588-
connection = self._session.cluster.connection_factory(self.host.endpoint, owning_pool=self)
589-
if self._keyspace:
590-
connection.set_keyspace_blocking(self._keyspace)
591-
self._connections[connection.shard_id] = connection
592-
except Exception:
593-
log.warning("Failed reconnecting %s. Retrying." % (self.host.endpoint,))
594-
self._session.submit(self._replace, connection)
595-
else:
596-
self._is_replacing = False
597-
self._stream_available_condition.notify()
596+
self._is_replacing = False
597+
self._stream_available_condition.notify()
598598

599599
def shutdown(self):
600600
log.debug("Shutting down connections to %s", self.host)

0 commit comments

Comments
 (0)