4141 from cassandra .util import WeakSet # NOQA
4242
4343from functools import partial , wraps
44- from itertools import groupby
44+ from itertools import groupby , count
4545
4646from cassandra import (ConsistencyLevel , AuthenticationFailed ,
4747 OperationTimedOut , UnsupportedOperation ,
7070 HostConnectionPool , HostConnection ,
7171 NoConnectionsAvailable )
7272from cassandra .query import (SimpleStatement , PreparedStatement , BoundStatement ,
73- BatchStatement , bind_params , QueryTrace , Statement ,
74- named_tuple_factory , dict_factory , FETCH_SIZE_UNSET )
73+ BatchStatement , bind_params , QueryTrace ,
74+ named_tuple_factory , dict_factory , tuple_factory , FETCH_SIZE_UNSET )
7575
7676
7777def _is_eventlet_monkey_patched ():
@@ -2542,6 +2542,7 @@ class _Scheduler(object):
25422542 def __init__ (self , executor ):
25432543 self ._queue = Queue .PriorityQueue ()
25442544 self ._scheduled_tasks = set ()
2545+ self ._count = count ()
25452546 self ._executor = executor
25462547
25472548 t = Thread (target = self .run , name = "Task Scheduler" )
@@ -2559,7 +2560,7 @@ def shutdown(self):
25592560 # this can happen on interpreter shutdown
25602561 pass
25612562 self .is_shutdown = True
2562- self ._queue .put_nowait ((0 , None ))
2563+ self ._queue .put_nowait ((0 , 0 , None ))
25632564
25642565 def schedule (self , delay , fn , * args , ** kwargs ):
25652566 self ._insert_task (delay , (fn , args , tuple (kwargs .items ())))
@@ -2575,7 +2576,7 @@ def _insert_task(self, delay, task):
25752576 if not self .is_shutdown :
25762577 run_at = time .time () + delay
25772578 self ._scheduled_tasks .add (task )
2578- self ._queue .put_nowait ((run_at , task ))
2579+ self ._queue .put_nowait ((run_at , next ( self . _count ), task ))
25792580 else :
25802581 log .debug ("Ignoring scheduled task after shutdown: %r" , task )
25812582
@@ -2586,7 +2587,7 @@ def run(self):
25862587
25872588 try :
25882589 while True :
2589- run_at , task = self ._queue .get (block = True , timeout = None )
2590+ run_at , i , task = self ._queue .get (block = True , timeout = None )
25902591 if self .is_shutdown :
25912592 log .debug ("Not executing scheduled task due to Scheduler shutdown" )
25922593 return
@@ -2597,7 +2598,7 @@ def run(self):
25972598 future = self ._executor .submit (fn , * args , ** kwargs )
25982599 future .add_done_callback (self ._log_if_failed )
25992600 else :
2600- self ._queue .put_nowait ((run_at , task ))
2601+ self ._queue .put_nowait ((run_at , i , task ))
26012602 break
26022603 except Queue .Empty :
26032604 pass
@@ -3408,3 +3409,24 @@ def get_all_query_traces(self, max_wait_sec_per=None):
34083409 See :meth:`.ResponseFuture.get_all_query_traces` for details.
34093410 """
34103411 return self .response_future .get_all_query_traces (max_wait_sec_per )
3412+
3413+ @property
3414+ def was_applied (self ):
3415+ """
3416+ For LWT results, returns whether the transaction was applied.
3417+
3418+ Result is indeterminate if called on a result that was not an LWT request.
3419+
3420+ Only valid when one of tne of the internal row factories is in use.
3421+ """
3422+ if self .response_future .row_factory not in (named_tuple_factory , dict_factory , tuple_factory ):
3423+ raise RuntimeError ("Cannot determine LWT result with row factory %s" % (self .response_future .row_factsory ,))
3424+ if len (self .current_rows ) != 1 :
3425+ raise RuntimeError ("LWT result should have exactly one row. This has %d." % (len (self .current_rows )))
3426+
3427+ row = self .current_rows [0 ]
3428+ if isinstance (row , tuple ):
3429+ return row [0 ]
3430+ else :
3431+ return row ['[applied]' ]
3432+
0 commit comments