Skip to content
This repository was archived by the owner on Aug 5, 2020. It is now read-only.

Commit 02a1c7f

Browse files
committed
Merge pull request apache#276 from datastax/PYTHON-280
PYTHON-280 - Custom payload for protocol v4
2 parents 4d84bcd + 9f0ca79 commit 02a1c7f

9 files changed

Lines changed: 157 additions & 33 deletions

File tree

CHANGELOG.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ Bug Fixes
7878
---------
7979
* Make execute_concurrent compatible with Python 2.6 (PYTHON-159)
8080
* Handle Unauthorized message on schema_triggers query (PYTHON-155)
81-
* Make execute_concurrent compatible with Python 2.6 (github-197)
8281
* Pure Python sorted set in support of UDTs nested in collections (PYTON-167)
8382
* Support CUSTOM index metadata and string export (PYTHON-165)
8483

cassandra/auth.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
except ImportError:
1616
SASLClient = None
1717

18+
1819
class AuthProvider(object):
1920
"""
2021
An abstract class that defines the interface that will be used for
@@ -157,6 +158,7 @@ def __init__(self, **sasl_kwargs):
157158
def new_authenticator(self, host):
158159
return SaslAuthenticator(**self.sasl_kwargs)
159160

161+
160162
class SaslAuthenticator(Authenticator):
161163
"""
162164
A pass-through :class:`~.Authenticator` using the third party package

cassandra/cluster.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,7 +1383,7 @@ def __init__(self, cluster, hosts):
13831383
for future in futures:
13841384
future.result()
13851385

1386-
def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
1386+
def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None):
13871387
"""
13881388
Execute the given query and synchronously wait for the response.
13891389
@@ -1411,6 +1411,10 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
14111411
instance and not just a string. If there is an error fetching the
14121412
trace details, the :attr:`~.Statement.trace` attribute will be left as
14131413
:const:`None`.
1414+
1415+
`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
1416+
If `query` is a Statement with its own custom_payload. The message payload
1417+
will be a union of the two, with the values specified here taking precedence.
14141418
"""
14151419
if timeout is _NOT_SET:
14161420
timeout = self.default_timeout
@@ -1420,7 +1424,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
14201424
"The query argument must be an instance of a subclass of "
14211425
"cassandra.query.Statement when trace=True")
14221426

1423-
future = self.execute_async(query, parameters, trace)
1427+
future = self.execute_async(query, parameters, trace, custom_payload)
14241428
try:
14251429
result = future.result(timeout)
14261430
finally:
@@ -1432,7 +1436,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
14321436

14331437
return result
14341438

1435-
def execute_async(self, query, parameters=None, trace=False):
1439+
def execute_async(self, query, parameters=None, trace=False, custom_payload=None):
14361440
"""
14371441
Execute the given query and return a :class:`~.ResponseFuture` object
14381442
which callbacks may be attached to for asynchronous response
@@ -1444,6 +1448,14 @@ def execute_async(self, query, parameters=None, trace=False):
14441448
:meth:`.ResponseFuture.get_query_trace()` after the request
14451449
completes to retrieve a :class:`.QueryTrace` instance.
14461450
1451+
`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
1452+
If `query` is a Statement with its own custom_payload. The message payload
1453+
will be a union of the two, with the values specified here taking precedence.
1454+
1455+
If the server sends a custom payload in the response message,
1456+
the dict can be obtained following :meth:`.ResponseFuture.result` via
1457+
:attr:`.ResponseFuture.custom_payload`
1458+
14471459
Example usage::
14481460
14491461
>>> session = cluster.connect()
@@ -1469,11 +1481,11 @@ def execute_async(self, query, parameters=None, trace=False):
14691481
... log.exception("Operation failed:")
14701482
14711483
"""
1472-
future = self._create_response_future(query, parameters, trace)
1484+
future = self._create_response_future(query, parameters, trace, custom_payload)
14731485
future.send_request()
14741486
return future
14751487

1476-
def _create_response_future(self, query, parameters, trace):
1488+
def _create_response_future(self, query, parameters, trace, custom_payload):
14771489
""" Returns the ResponseFuture before calling send_request() on it """
14781490

14791491
prepared_statement = None
@@ -1523,13 +1535,16 @@ def _create_response_future(self, query, parameters, trace):
15231535
if trace:
15241536
message.tracing = True
15251537

1538+
message.update_custom_payload(query.custom_payload)
1539+
message.update_custom_payload(custom_payload)
1540+
15261541
return ResponseFuture(
15271542
self, message, query, self.default_timeout, metrics=self._metrics,
15281543
prepared_statement=prepared_statement)
15291544

1530-
def prepare(self, query):
1545+
def prepare(self, query, custom_payload=None):
15311546
"""
1532-
Prepares a query string, returing a :class:`~cassandra.query.PreparedStatement`
1547+
Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
15331548
instance which can be used as follows::
15341549
15351550
>>> session = cluster.connect("mykeyspace")
@@ -1552,8 +1567,12 @@ def prepare(self, query):
15521567
15531568
**Important**: PreparedStatements should be prepared only once.
15541569
Preparing the same query more than once will likely affect performance.
1570+
1571+
`custom_payload` is a key value map to be passed along with the prepare
1572+
message. See :ref:`custom_payload`.
15551573
"""
15561574
message = PrepareMessage(query=query)
1575+
message.custom_payload = custom_payload
15571576
future = ResponseFuture(self, message, query=None)
15581577
try:
15591578
future.send_request()
@@ -1565,6 +1584,7 @@ def prepare(self, query):
15651584
prepared_statement = PreparedStatement.from_message(
15661585
query_id, column_metadata, pk_indexes, self.cluster.metadata, query, self.keyspace,
15671586
self._protocol_version)
1587+
prepared_statement.custom_payload = future.custom_payload
15681588

15691589
host = future._current_host
15701590
try:
@@ -2636,6 +2656,7 @@ class ResponseFuture(object):
26362656
_start_time = None
26372657
_metrics = None
26382658
_paging_state = None
2659+
_custom_payload = None
26392660

26402661
def __init__(self, session, message, query, default_timeout=None, metrics=None, prepared_statement=None):
26412662
self.session = session
@@ -2723,6 +2744,23 @@ def has_more_pages(self):
27232744
"""
27242745
return self._paging_state is not None
27252746

2747+
@property
2748+
def custom_payload(self):
2749+
"""
2750+
The custom payload returned from the server, if any. This will only be
2751+
set by Cassandra servers implementing a custom QueryHandler, and only
2752+
for protocol_version 4+.
2753+
2754+
Ensure the future is complete before trying to access this property
2755+
(call :meth:`.result()`, or after callback is invoked).
2756+
Otherwise it may throw if the response has not been received.
2757+
2758+
:return: :ref:`custom_payload`.
2759+
"""
2760+
if not self._event.is_set():
2761+
raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized")
2762+
return self._custom_payload
2763+
27262764
def start_fetching_next_page(self):
27272765
"""
27282766
If there are more pages left in the query result, this asynchronously
@@ -2759,6 +2797,8 @@ def _set_result(self, response):
27592797
if trace_id:
27602798
self._query_trace = QueryTrace(trace_id, self.session)
27612799

2800+
self._custom_payload = getattr(response, 'custom_payload', None)
2801+
27622802
if isinstance(response, ResultMessage):
27632803
if response.kind == RESULT_KIND_SET_KEYSPACE:
27642804
session = getattr(self, 'session', None)

cassandra/protocol.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class InternalError(Exception):
5656

5757
COMPRESSED_FLAG = 0x01
5858
TRACING_FLAG = 0x02
59+
CUSTOM_PAYLOAD_FLAG = 0x04
5960

6061
_message_types_by_name = {}
6162
_message_types_by_opcode = {}
@@ -72,13 +73,19 @@ def __init__(cls, name, bases, dct):
7273
class _MessageType(object):
7374

7475
tracing = False
76+
custom_payload = None
7577

7678
def to_binary(self, stream_id, protocol_version, compression=None):
79+
flags = 0
7780
body = io.BytesIO()
81+
if self.custom_payload:
82+
if protocol_version < 4:
83+
raise UnsupportedOperation("Custom key/value payloads can only be used with protocol version 4 or higher")
84+
flags |= CUSTOM_PAYLOAD_FLAG
85+
write_bytesmap(body, self.custom_payload)
7886
self.send_body(body, protocol_version)
7987
body = body.getvalue()
8088

81-
flags = 0
8289
if compression and len(body) > 0:
8390
body = compression(body)
8491
flags |= COMPRESSED_FLAG
@@ -91,6 +98,12 @@ def to_binary(self, stream_id, protocol_version, compression=None):
9198

9299
return msg.getvalue()
93100

101+
def update_custom_payload(self, other):
102+
if other:
103+
if not self.custom_payload:
104+
self.custom_payload = {}
105+
self.custom_payload.update(other)
106+
94107
def __repr__(self):
95108
return '<%s(%s)>' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in _get_params(self)))
96109

@@ -118,13 +131,20 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b
118131
else:
119132
trace_id = None
120133

134+
if flags & CUSTOM_PAYLOAD_FLAG:
135+
custom_payload = read_bytesmap(body)
136+
flags ^= CUSTOM_PAYLOAD_FLAG
137+
else:
138+
custom_payload = None
139+
121140
if flags:
122141
log.warning("Unknown protocol flags set: %02x. May cause problems.", flags)
123142

124143
msg_class = _message_types_by_opcode[opcode]
125144
msg = msg_class.recv_body(body, protocol_version, user_type_map)
126145
msg.stream_id = stream_id
127146
msg.trace_id = trace_id
147+
msg.custom_payload = custom_payload
128148
return msg
129149

130150

@@ -977,6 +997,11 @@ def read_binary_string(f):
977997
return contents
978998

979999

1000+
def write_binary_string(f, s):
1001+
write_short(f, len(s))
1002+
f.write(s)
1003+
1004+
9801005
def write_string(f, s):
9811006
if isinstance(s, six.text_type):
9821007
s = s.encode('utf8')
@@ -1028,6 +1053,22 @@ def write_stringmap(f, strmap):
10281053
write_string(f, v)
10291054

10301055

1056+
def read_bytesmap(f):
1057+
numpairs = read_short(f)
1058+
bytesmap = {}
1059+
for _ in range(numpairs):
1060+
k = read_string(f)
1061+
bytesmap[k] = read_binary_string(f)
1062+
return bytesmap
1063+
1064+
1065+
def write_bytesmap(f, bytesmap):
1066+
write_short(f, len(bytesmap))
1067+
for k, v in bytesmap.items():
1068+
write_string(f, k)
1069+
write_binary_string(f, v)
1070+
1071+
10311072
def read_stringmultimap(f):
10321073
numkeys = read_short(f)
10331074
strmmap = {}

0 commit comments

Comments
 (0)