Skip to content

Commit f92bd4c

Browse files
committed
Merge remote-tracking branch 'origin/3.1'
Conflicts: CHANGELOG.rst tests/integration/cqlengine/columns/test_validation.py
2 parents d90d037 + 853a77a commit f92bd4c

22 files changed

Lines changed: 468 additions & 58 deletions

CHANGELOG.rst

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
1-
3.2.0
2-
=====
3-
4-
Features
5-
--------
6-
7-
Bug Fixes
8-
---------
9-
* Fix Python3 bug with Timers and heaps (github #466)
10-
111
3.1.0
122
=====
133

@@ -16,12 +6,14 @@ Features
166
* Pass name of server auth class to AuthProvider (PYTHON-454)
177
* Surface schema agreed flag for DDL statements (PYTHON-458)
188
* Automatically convert float and int to Decimal on serialization (PYTHON-468)
9+
* Expose prior state information via cqlengine LWTException (github #343)
1910

2011
Bug Fixes
2112
---------
2213
* Bus error (alignment issues) when running cython on some ARM platforms (PYTHON-450)
2314
* Overflow when decoding large collections (cython) (PYTHON-459)
2415
* Crash when updating a UDT column with a None value (github #467)
16+
* Timer heap comparison issue with Python 3 (github #466)
2517

2618
3.0.0
2719
=====

cassandra/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def emit(self, record):
2323
logging.getLogger('cassandra').addHandler(NullHandler())
2424

2525

26-
__version_info__ = (3, 1, '0a1', 'post0')
26+
__version_info__ = (3, 1, '0a2', 'post0')
2727
__version__ = '.'.join(map(str, __version_info__))
2828

2929

cassandra/cluster.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from cassandra.util import WeakSet # NOQA
4242

4343
from functools import partial, wraps
44-
from itertools import groupby
44+
from itertools import groupby, count
4545

4646
from cassandra import (ConsistencyLevel, AuthenticationFailed,
4747
OperationTimedOut, UnsupportedOperation,
@@ -70,8 +70,8 @@
7070
HostConnectionPool, HostConnection,
7171
NoConnectionsAvailable)
7272
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
73-
BatchStatement, bind_params, QueryTrace, Statement,
74-
named_tuple_factory, dict_factory, FETCH_SIZE_UNSET)
73+
BatchStatement, bind_params, QueryTrace,
74+
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET)
7575

7676

7777
def _is_eventlet_monkey_patched():
@@ -2542,6 +2542,7 @@ class _Scheduler(object):
25422542
def __init__(self, executor):
25432543
self._queue = Queue.PriorityQueue()
25442544
self._scheduled_tasks = set()
2545+
self._count = count()
25452546
self._executor = executor
25462547

25472548
t = Thread(target=self.run, name="Task Scheduler")
@@ -2559,7 +2560,7 @@ def shutdown(self):
25592560
# this can happen on interpreter shutdown
25602561
pass
25612562
self.is_shutdown = True
2562-
self._queue.put_nowait((0, None))
2563+
self._queue.put_nowait((0, 0, None))
25632564

25642565
def schedule(self, delay, fn, *args, **kwargs):
25652566
self._insert_task(delay, (fn, args, tuple(kwargs.items())))
@@ -2575,7 +2576,7 @@ def _insert_task(self, delay, task):
25752576
if not self.is_shutdown:
25762577
run_at = time.time() + delay
25772578
self._scheduled_tasks.add(task)
2578-
self._queue.put_nowait((run_at, task))
2579+
self._queue.put_nowait((run_at, next(self._count), task))
25792580
else:
25802581
log.debug("Ignoring scheduled task after shutdown: %r", task)
25812582

@@ -2586,7 +2587,7 @@ def run(self):
25862587

25872588
try:
25882589
while True:
2589-
run_at, task = self._queue.get(block=True, timeout=None)
2590+
run_at, i, task = self._queue.get(block=True, timeout=None)
25902591
if self.is_shutdown:
25912592
log.debug("Not executing scheduled task due to Scheduler shutdown")
25922593
return
@@ -2597,7 +2598,7 @@ def run(self):
25972598
future = self._executor.submit(fn, *args, **kwargs)
25982599
future.add_done_callback(self._log_if_failed)
25992600
else:
2600-
self._queue.put_nowait((run_at, task))
2601+
self._queue.put_nowait((run_at, i, task))
26012602
break
26022603
except Queue.Empty:
26032604
pass
@@ -3408,3 +3409,24 @@ def get_all_query_traces(self, max_wait_sec_per=None):
34083409
See :meth:`.ResponseFuture.get_all_query_traces` for details.
34093410
"""
34103411
return self.response_future.get_all_query_traces(max_wait_sec_per)
3412+
3413+
@property
3414+
def was_applied(self):
3415+
"""
3416+
For LWT results, returns whether the transaction was applied.
3417+
3418+
Result is indeterminate if called on a result that was not an LWT request.
3419+
3420+
Only valid when one of tne of the internal row factories is in use.
3421+
"""
3422+
if self.response_future.row_factory not in (named_tuple_factory, dict_factory, tuple_factory):
3423+
raise RuntimeError("Cannot determine LWT result with row factory %s" % (self.response_future.row_factsory,))
3424+
if len(self.current_rows) != 1:
3425+
raise RuntimeError("LWT result should have exactly one row. This has %d." % (len(self.current_rows)))
3426+
3427+
row = self.current_rows[0]
3428+
if isinstance(row, tuple):
3429+
return row[0]
3430+
else:
3431+
return row['[applied]']
3432+

cassandra/cqlengine/query.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,17 @@ class IfNotExistsWithCounterColumn(CQLEngineException):
4040

4141

4242
class LWTException(CQLEngineException):
43-
pass
43+
"""Lightweight transaction exception.
44+
45+
This exception will be raised when a write using an `IF` clause could not be
46+
applied due to existing data violating the condition. The existing data is
47+
available through the ``existing`` attribute.
48+
49+
:param existing: The current state of the data which prevented the write.
50+
"""
51+
def __init__(self, existing):
52+
super(LWTException, self).__init__(self)
53+
self.existing = existing
4454

4555

4656
class DoesNotExist(QueryException):
@@ -53,12 +63,14 @@ class MultipleObjectsReturned(QueryException):
5363

5464
def check_applied(result):
5565
"""
56-
check if result contains some column '[applied]' with false value,
57-
if that value is false, it means our light-weight transaction didn't
58-
applied to database.
66+
Raises LWTException if it looks like a failed LWT request.
5967
"""
60-
if result and '[applied]' in result[0] and not result[0]['[applied]']:
61-
raise LWTException('')
68+
try:
69+
applied = result.was_applied
70+
except Exception:
71+
applied = True # result was not LWT form
72+
if not applied:
73+
raise LWTException(result[0])
6274

6375

6476
class AbstractQueryableColumn(UnicodeMixin):

cassandra/cython_utils.pyx

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,15 @@ from cassandra.util import is_little_endian
3535

3636
import_datetime()
3737

38+
DEF DAY_IN_SECONDS = 86400
39+
3840
DATETIME_EPOC = datetime.datetime(1970, 1, 1)
3941

4042

4143
cdef datetime_from_timestamp(double timestamp):
42-
cdef int seconds = <int> timestamp
43-
cdef int microseconds = (<int64_t> (timestamp * 1000000)) % 1000000
44-
return DATETIME_EPOC + timedelta_new(0, seconds, microseconds)
44+
cdef int days = <int> (timestamp / DAY_IN_SECONDS)
45+
cdef int64_t days_in_seconds = (<int64_t> days) * DAY_IN_SECONDS
46+
cdef int seconds = <int> (timestamp - days_in_seconds)
47+
cdef int microseconds = <int> ((timestamp - days_in_seconds - seconds) * 1000000)
48+
49+
return DATETIME_EPOC + timedelta_new(days, seconds, microseconds)

docs/api/cassandra/cqlengine/models.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ Model
4848
See the `list of supported table properties for more information
4949
<http://www.datastax.com/documentation/cql/3.1/cql/cql_reference/tabProp.html>`_.
5050

51-
5251
.. attribute:: __options__
5352

5453
For example:
@@ -89,15 +88,15 @@ Model
8988
object is determined by its primary key(s). And please note using this flag
9089
would incur performance cost.
9190

92-
if the insertion didn't applied, a LWTException exception would be raised.
91+
If the insertion isn't applied, a :class:`~cassandra.cqlengine.query.LWTException` is raised.
9392

9493
.. code-block:: python
9594
9695
try:
9796
TestIfNotExistsModel.if_not_exists().create(id=id, count=9, text='111111111111')
9897
except LWTException as e:
9998
# handle failure case
100-
print e.existing # existing object
99+
print e.existing # dict containing LWT result fields
101100
102101
This method is supported on Cassandra 2.0 or later.
103102

@@ -111,15 +110,16 @@ Model
111110
Simply specify the column(s) and the expected value(s). As with if_not_exists,
112111
this incurs a performance cost.
113112

114-
If the insertion isn't applied, a LWTException is raised
113+
If the insertion isn't applied, a :class:`~cassandra.cqlengine.query.LWTException` is raised.
115114

116115
.. code-block:: python
117116
118117
t = TestTransactionModel(text='some text', count=5)
119118
try:
120119
t.iff(count=5).update('other text')
121120
except LWTException as e:
122-
# handle failure
121+
# handle failure case
122+
print e.existing # existing object
123123
124124
.. automethod:: get
125125

docs/api/cassandra/cqlengine/query.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ The methods here are used to filter, order, and constrain results.
4242

4343
.. autoclass:: MultipleObjectsReturned
4444

45+
.. autoclass:: LWTException

setup.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ def __init__(self, ext):
171171
=================================================================================
172172
"""
173173

174-
175174
is_windows = os.name == 'nt'
176175

177176
is_pypy = "PyPy" in sys.version
@@ -323,6 +322,54 @@ def _setup_extensions(self):
323322
sys.stderr.write("Failed to cythonize one or more modules. These will not be compiled as extensions (optional).\n")
324323

325324

325+
def pre_build_check():
326+
"""
327+
Try to verify build tools
328+
"""
329+
if os.environ.get('CASS_DRIVER_NO_PRE_BUILD_CHECK'):
330+
return True
331+
332+
try:
333+
from distutils.ccompiler import new_compiler
334+
from distutils.sysconfig import customize_compiler
335+
from distutils.dist import Distribution
336+
337+
# base build_ext just to emulate compiler option setup
338+
be = build_ext(Distribution())
339+
be.initialize_options()
340+
be.finalize_options()
341+
342+
# First, make sure we have a Python include directory
343+
have_python_include = any(os.path.isfile(os.path.join(p, 'Python.h')) for p in be.include_dirs)
344+
if not have_python_include:
345+
sys.stderr.write("Did not find 'Python.h' in %s.\n" % (be.include_dirs,))
346+
return False
347+
348+
compiler = new_compiler(compiler=be.compiler)
349+
customize_compiler(compiler)
350+
351+
executables = []
352+
if compiler.compiler_type in ('unix', 'cygwin'):
353+
executables = [compiler.executables[exe][0] for exe in ('compiler_so', 'linker_so')]
354+
elif compiler.compiler_type == 'nt':
355+
executables = [getattr(compiler, exe) for exe in ('cc', 'linker')]
356+
357+
if executables:
358+
from distutils.spawn import find_executable
359+
for exe in executables:
360+
if not find_executable(exe):
361+
sys.stderr.write("Failed to find %s for compiler type %s.\n" % (exe, compiler.compiler_type))
362+
return False
363+
364+
except Exception as exc:
365+
sys.stderr.write('%s\n' % str(exc))
366+
sys.stderr.write("Failed pre-build check. Attempting anyway.\n")
367+
368+
# if we are unable to positively id the compiler type, or one of these assumptions fails,
369+
# just proceed as we would have without the check
370+
return True
371+
372+
326373
def run_setup(extensions):
327374

328375
kw = {'cmdclass': {'doc': DocCommand}}
@@ -336,7 +383,14 @@ def run_setup(extensions):
336383
kw['ext_modules'] = [Extension('DUMMY', [])] # dummy extension makes sure build_ext is called for install
337384

338385
if try_cython:
339-
kw['setup_requires'] = ['Cython>=0.20']
386+
# precheck compiler before adding to setup_requires
387+
# we don't actually negate try_cython because:
388+
# 1.) build_ext eats errors at compile time, letting the install complete while producing useful feedback
389+
# 2.) there could be a case where the python environment has cython installed but the system doesn't have build tools
390+
if pre_build_check():
391+
kw['setup_requires'] = ['Cython>=0.20']
392+
else:
393+
sys.stderr.write("Bypassing Cython setup requirement\n")
340394

341395
dependencies = ['six >=1.6']
342396

tests/integration/__init__.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
except ImportError:
1818
import unittest # noqa
1919

20+
import logging
21+
import os
2022
import socket
21-
import os, six, time, sys, logging, traceback
23+
import sys
24+
import time
25+
import traceback
2226
from threading import Event
2327
from subprocess import call
2428
from itertools import groupby
@@ -147,9 +151,9 @@ def _get_cass_version_from_dse(dse_version):
147151
def wait_for_node_socket(node, timeout):
148152
binary_itf = node.network_interfaces['binary']
149153
if not common.check_socket_listening(binary_itf, timeout=timeout):
150-
print("Unable to connect to binary socket for node"+str(node))
154+
log.warn("Unable to connect to binary socket for node " + node.name)
151155
else:
152-
print("Node is up and listening "+str(node))
156+
log.debug("Node %s is up and listening " % (node.name,))
153157

154158

155159
def check_socket_listening(itf, timeout=60):
@@ -270,13 +274,13 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
270274
node.set_workloads(workloads)
271275
log.debug("Starting CCM cluster: {0}".format(cluster_name))
272276
CCM_CLUSTER.start(wait_for_binary_proto=True, wait_other_notice=True, jvm_args=jvm_args)
273-
#Added to wait for slow nodes to start up
277+
# Added to wait for slow nodes to start up
274278
for node in CCM_CLUSTER.nodes.values():
275279
wait_for_node_socket(node, 120)
276280
setup_keyspace(ipformat=ipformat)
277281
except Exception:
278282
log.exception("Failed to start CCM cluster; removing cluster.")
279-
283+
280284
if os.name == "nt":
281285
if CCM_CLUSTER:
282286
for node in CCM_CLUSTER.nodes.itervalues():
@@ -591,5 +595,3 @@ def setUp(self):
591595

592596
def tearDown(self):
593597
self.cluster.shutdown()
594-
595-

0 commit comments

Comments
 (0)