@@ -1383,7 +1383,7 @@ def __init__(self, cluster, hosts):
13831383 for future in futures :
13841384 future .result ()
13851385
1386- def execute (self , query , parameters = None , timeout = _NOT_SET , trace = False ):
1386+ def execute (self , query , parameters = None , timeout = _NOT_SET , trace = False , custom_payload = None ):
13871387 """
13881388 Execute the given query and synchronously wait for the response.
13891389
@@ -1411,6 +1411,10 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
14111411 instance and not just a string. If there is an error fetching the
14121412 trace details, the :attr:`~.Statement.trace` attribute will be left as
14131413 :const:`None`.
1414+
1415+ `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
1416+ If `query` is a Statement with its own custom_payload. The message payload
1417+ will be a union of the two, with the values specified here taking precedence.
14141418 """
14151419 if timeout is _NOT_SET :
14161420 timeout = self .default_timeout
@@ -1420,7 +1424,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
14201424 "The query argument must be an instance of a subclass of "
14211425 "cassandra.query.Statement when trace=True" )
14221426
1423- future = self .execute_async (query , parameters , trace )
1427+ future = self .execute_async (query , parameters , trace , custom_payload )
14241428 try :
14251429 result = future .result (timeout )
14261430 finally :
@@ -1432,7 +1436,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
14321436
14331437 return result
14341438
1435- def execute_async (self , query , parameters = None , trace = False ):
1439+ def execute_async (self , query , parameters = None , trace = False , custom_payload = None ):
14361440 """
14371441 Execute the given query and return a :class:`~.ResponseFuture` object
14381442 which callbacks may be attached to for asynchronous response
@@ -1444,6 +1448,14 @@ def execute_async(self, query, parameters=None, trace=False):
14441448 :meth:`.ResponseFuture.get_query_trace()` after the request
14451449 completes to retrieve a :class:`.QueryTrace` instance.
14461450
1451+ `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
1452+ If `query` is a Statement with its own custom_payload. The message payload
1453+ will be a union of the two, with the values specified here taking precedence.
1454+
1455+ If the server sends a custom payload in the response message,
1456+ the dict can be obtained following :meth:`.ResponseFuture.result` via
1457+ :attr:`.ResponseFuture.custom_payload`
1458+
14471459 Example usage::
14481460
14491461 >>> session = cluster.connect()
@@ -1469,11 +1481,11 @@ def execute_async(self, query, parameters=None, trace=False):
14691481 ... log.exception("Operation failed:")
14701482
14711483 """
1472- future = self ._create_response_future (query , parameters , trace )
1484+ future = self ._create_response_future (query , parameters , trace , custom_payload )
14731485 future .send_request ()
14741486 return future
14751487
1476- def _create_response_future (self , query , parameters , trace ):
1488+ def _create_response_future (self , query , parameters , trace , custom_payload ):
14771489 """ Returns the ResponseFuture before calling send_request() on it """
14781490
14791491 prepared_statement = None
@@ -1523,13 +1535,16 @@ def _create_response_future(self, query, parameters, trace):
15231535 if trace :
15241536 message .tracing = True
15251537
1538+ message .update_custom_payload (query .custom_payload )
1539+ message .update_custom_payload (custom_payload )
1540+
15261541 return ResponseFuture (
15271542 self , message , query , self .default_timeout , metrics = self ._metrics ,
15281543 prepared_statement = prepared_statement )
15291544
1530- def prepare (self , query ):
1545+ def prepare (self , query , custom_payload = None ):
15311546 """
1532- Prepares a query string, returing a :class:`~cassandra.query.PreparedStatement`
1547+ Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
15331548 instance which can be used as follows::
15341549
15351550 >>> session = cluster.connect("mykeyspace")
@@ -1552,8 +1567,12 @@ def prepare(self, query):
15521567
15531568 **Important**: PreparedStatements should be prepared only once.
15541569 Preparing the same query more than once will likely affect performance.
1570+
1571+ `custom_payload` is a key value map to be passed along with the prepare
1572+ message. See :ref:`custom_payload`.
15551573 """
15561574 message = PrepareMessage (query = query )
1575+ message .custom_payload = custom_payload
15571576 future = ResponseFuture (self , message , query = None )
15581577 try :
15591578 future .send_request ()
@@ -1565,6 +1584,7 @@ def prepare(self, query):
15651584 prepared_statement = PreparedStatement .from_message (
15661585 query_id , column_metadata , pk_indexes , self .cluster .metadata , query , self .keyspace ,
15671586 self ._protocol_version )
1587+ prepared_statement .custom_payload = future .custom_payload
15681588
15691589 host = future ._current_host
15701590 try :
@@ -2636,6 +2656,7 @@ class ResponseFuture(object):
26362656 _start_time = None
26372657 _metrics = None
26382658 _paging_state = None
2659+ _custom_payload = None
26392660
26402661 def __init__ (self , session , message , query , default_timeout = None , metrics = None , prepared_statement = None ):
26412662 self .session = session
@@ -2723,6 +2744,23 @@ def has_more_pages(self):
27232744 """
27242745 return self ._paging_state is not None
27252746
2747+ @property
2748+ def custom_payload (self ):
2749+ """
2750+ The custom payload returned from the server, if any. This will only be
2751+ set by Cassandra servers implementing a custom QueryHandler, and only
2752+ for protocol_version 4+.
2753+
2754+ Ensure the future is complete before trying to access this property
2755+ (call :meth:`.result()`, or after callback is invoked).
2756+ Otherwise it may throw if the response has not been received.
2757+
2758+ :return: :ref:`custom_payload`.
2759+ """
2760+ if not self ._event .is_set ():
2761+ raise Exception ("custom_payload cannot be retrieved before ResponseFuture is finalized" )
2762+ return self ._custom_payload
2763+
27262764 def start_fetching_next_page (self ):
27272765 """
27282766 If there are more pages left in the query result, this asynchronously
@@ -2759,6 +2797,8 @@ def _set_result(self, response):
27592797 if trace_id :
27602798 self ._query_trace = QueryTrace (trace_id , self .session )
27612799
2800+ self ._custom_payload = getattr (response , 'custom_payload' , None )
2801+
27622802 if isinstance (response , ResultMessage ):
27632803 if response .kind == RESULT_KIND_SET_KEYSPACE :
27642804 session = getattr (self , 'session' , None )
0 commit comments