1313 WriteTimeoutErrorMessage ,
1414 UnavailableErrorMessage ,
1515 OverloadedErrorMessage ,
16- IsBootstrappingErrorMessage )
16+ IsBootstrappingErrorMessage , named_tuple_factory ,
17+ dict_factory )
1718from cassandra .metadata import Metadata
1819from cassandra .policies import (RoundRobinPolicy , SimpleConvictionPolicy ,
1920 ExponentialReconnectionPolicy , HostDistance ,
@@ -430,6 +431,19 @@ class Session(object):
430431 keyspace = None
431432 is_shutdown = False
432433
434+ row_factory = staticmethod (named_tuple_factory )
435+ """
436+ The format to return row results in. By default, each
437+ returned row will be a named tuple. You can alternatively
438+ use any of the following:
439+
440+ - :func:`cassandra.decoder.tuple_factory`
441+ - :func:`cassandra.decoder.named_tuple_factory`
442+ - :func:`cassandra.decoder.dict_factory`
443+ - :func:`cassandra.decoder.ordered_dict_factory`
444+
445+ """
446+
433447 _lock = None
434448 _pools = None
435449 _load_balancer = None
@@ -798,9 +812,14 @@ def _refresh_schema(self, connection, keyspace=None, table=None):
798812
799813 if ks_query :
800814 ks_result , cf_result , col_result = connection .wait_for_responses (ks_query , cf_query , col_query )
815+ ks_result = dict_factory (* ks_result .results )
816+ cf_result = dict_factory (* cf_result .results )
817+ col_result = dict_factory (* col_result .results )
801818 else :
802819 ks_result = None
803820 cf_result , col_result = connection .wait_for_responses (cf_query , col_query )
821+ cf_result = dict_factory (* cf_result .results )
822+ col_result = dict_factory (* col_result .results )
804823
805824 self ._cluster .metadata .rebuild_schema (keyspace , table , ks_result , cf_result , col_result )
806825
@@ -817,12 +836,14 @@ def _refresh_node_list_and_token_map(self, connection):
817836 peers_query = QueryMessage (query = self ._SELECT_PEERS , consistency_level = cl )
818837 local_query = QueryMessage (query = self ._SELECT_LOCAL , consistency_level = cl )
819838 peers_result , local_result = connection .wait_for_responses (peers_query , local_query )
839+ peers_result = dict_factory (* peers_result .results )
820840
821841 partitioner = None
822842 token_map = {}
823843
824844 if local_result .results :
825- local_row = local_result .results [0 ]
845+ local_rows = dict_factory (* (local_result .results ))
846+ local_row = local_rows [0 ]
826847 cluster_name = local_row ["cluster_name" ]
827848 self ._cluster .metadata .cluster_name = cluster_name
828849
@@ -836,7 +857,7 @@ def _refresh_node_list_and_token_map(self, connection):
836857 token_map [host ] = tokens
837858
838859 found_hosts = set ()
839- for row in peers_result . results :
860+ for row in peers_result :
840861 addr = row .get ("rpc_address" )
841862
842863 # TODO handle ipv6 equivalent
@@ -909,14 +930,15 @@ def wait_for_schema_agreement(self, connection=None):
909930 peers_query = QueryMessage (query = self ._SELECT_SCHEMA_PEERS , consistency_level = cl )
910931 local_query = QueryMessage (query = self ._SELECT_SCHEMA_LOCAL , consistency_level = cl )
911932 peers_result , local_result = connection .wait_for_responses (peers_query , local_query )
933+ peers_result = dict_factory (* peers_result .results )
912934
913935 versions = set ()
914936 if local_result .results :
915- local_row = local_result .results [0 ]
937+ local_row = dict_factory ( * local_result .results ) [0 ]
916938 if local_row .get ("schema_version" ):
917939 versions .add (local_row .get ("schema_version" ))
918940
919- for row in peers_result . results :
941+ for row in peers_result :
920942 if not row .get ("rpc_address" ) or not row .get ("schema_version" ):
921943 continue
922944
@@ -1074,6 +1096,7 @@ class ResponseFuture(object):
10741096
10751097 def __init__ (self , session , message , query ):
10761098 self .session = session
1099+ self .row_factory = session .row_factory
10771100 self .message = message
10781101 self .query = query
10791102
@@ -1149,7 +1172,10 @@ def _set_result(self, response):
11491172 self .session .cluster .control_connection ,
11501173 self )
11511174 else :
1152- self ._set_final_result (getattr (response , 'results' , None ))
1175+ results = getattr (response , 'results' , None )
1176+ if results :
1177+ results = self .row_factory (* results )
1178+ self ._set_final_result (results )
11531179 elif isinstance (response , ErrorMessage ):
11541180 retry_policy = self .query .retry_policy
11551181 if not retry_policy :
0 commit comments