Skip to content

Commit

Permalink
Handle null buffer in R <-> Array conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
romainfrancois committed Sep 24, 2018
1 parent a5b8190 commit ede8e44
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 12 deletions.
12 changes: 12 additions & 0 deletions r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ Table_to_dataframe <- function(table) {
.Call(`_arrow_Table_to_dataframe`, table)
}

Table__column <- function(table, i) {
.Call(`_arrow_Table__column`, table, i)
}

Column__length <- function(column) {
.Call(`_arrow_Column__length`, column)
}

Column__null_count <- function(column) {
.Call(`_arrow_Column__null_count`, column)
}

Field_initialize <- function(name, type, nullable = TRUE) {
.Call(`_arrow_Field_initialize`, name, type, nullable)
}
Expand Down
10 changes: 9 additions & 1 deletion r/R/array.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,20 @@ read_record_batch <- function(path){
`arrow::RecordBatch`$new(read_record_batch_(fs::path_abs(path)))
}

`arrow::Column` <- R6Class("arrow::Column", inherit = `arrow::Object`,
public = list(
length = function() Column__length(self),
null_count = function() Column__null_count(self)
)
)

`arrow::Table` <- R6Class("arrow::Table", inherit = `arrow::Object`,
public = list(
num_columns = function() Table_num_columns(self),
num_rows = function() Table_num_rows(self),
schema = function() `arrow::Schema`$new(Table_schema(self)),
to_file = function(path) invisible(Table_to_file(self, fs::path_abs(path)))
to_file = function(path) invisible(Table_to_file(self, fs::path_abs(path))),
column = function(i) `arrow::Column`$new(Table__column(self, i))
)
)

Expand Down
37 changes: 37 additions & 0 deletions r/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,40 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// Table__column
std::shared_ptr<arrow::Column> Table__column(const std::shared_ptr<arrow::Table>& table, int i);
RcppExport SEXP _arrow_Table__column(SEXP tableSEXP, SEXP iSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& >::type table(tableSEXP);
Rcpp::traits::input_parameter< int >::type i(iSEXP);
rcpp_result_gen = Rcpp::wrap(Table__column(table, i));
return rcpp_result_gen;
END_RCPP
}
// Column__length
int Column__length(const std::shared_ptr<arrow::Column>& column);
RcppExport SEXP _arrow_Column__length(SEXP columnSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Column>& >::type column(columnSEXP);
rcpp_result_gen = Rcpp::wrap(Column__length(column));
return rcpp_result_gen;
END_RCPP
}
// Column__null_count
int Column__null_count(const std::shared_ptr<arrow::Column>& column);
RcppExport SEXP _arrow_Column__null_count(SEXP columnSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Column>& >::type column(columnSEXP);
rcpp_result_gen = Rcpp::wrap(Column__null_count(column));
return rcpp_result_gen;
END_RCPP
}
// Field_initialize
std::shared_ptr<arrow::Field> Field_initialize(const std::string& name, const std::shared_ptr<arrow::DataType>& type, bool nullable);
RcppExport SEXP _arrow_Field_initialize(SEXP nameSEXP, SEXP typeSEXP, SEXP nullableSEXP) {
Expand Down Expand Up @@ -959,6 +993,9 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_Table_to_file", (DL_FUNC) &_arrow_Table_to_file, 2},
{"_arrow_read_table_", (DL_FUNC) &_arrow_read_table_, 1},
{"_arrow_Table_to_dataframe", (DL_FUNC) &_arrow_Table_to_dataframe, 1},
{"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2},
{"_arrow_Column__length", (DL_FUNC) &_arrow_Column__length, 1},
{"_arrow_Column__null_count", (DL_FUNC) &_arrow_Column__null_count, 1},
{"_arrow_Field_initialize", (DL_FUNC) &_arrow_Field_initialize, 3},
{"_arrow_Field_ToString", (DL_FUNC) &_arrow_Field_ToString, 1},
{"_arrow_Field_name", (DL_FUNC) &_arrow_Field_name, 1},
Expand Down
69 changes: 58 additions & 11 deletions r/src/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SimpleRBuffer : public arrow::Buffer {
public:

SimpleRBuffer(Vec vec) :
Buffer(reinterpret_cast<const uint8_t*>(vec.begin()), vec.size() * sizeof(typename Vec::stored_type) ),
Buffer(reinterpret_cast<const uint8_t*>(vec.begin()), vec.size() * sizeof(typename Vec::stored_type)),
vec_(vec)
{}

Expand All @@ -43,15 +43,35 @@ class SimpleRBuffer : public arrow::Buffer {

template <int RTYPE, typename Type>
std::shared_ptr<arrow::Array> SimpleArray(SEXP x){
// a simple buffer that owns the memory of `x`
auto buffer = std::make_shared<SimpleRBuffer<RTYPE>>(x);
auto type = std::make_shared<Type>();

Rcpp::Vector<RTYPE> vec(x);
std::vector<std::shared_ptr<arrow::Buffer>> buffers {
nullptr,
std::make_shared<SimpleRBuffer<RTYPE>>(vec)
};

int null_count = 0;
if (RTYPE != RAWSXP) {
// TODO: maybe first count NA in a first pass so that we
// can allocate only if needed
std::shared_ptr<arrow::Buffer> null_bitmap;
R_ERROR_NOT_OK(arrow::AllocateBuffer(vec.size(), &null_bitmap));

auto null_bitmap_data = null_bitmap->mutable_data();
for (int i=0; i < vec.size(); i++) {
if (Rcpp::Vector<RTYPE>::is_na(vec[i]) ) {
BitUtil::SetBit(null_bitmap_data, i);
null_count++;
}
}
buffers[0] = null_bitmap;
}

auto data = ArrayData::Make(
type,
std::make_shared<Type>(),
LENGTH(x),
{nullptr, buffer}, /* for now we just use a nullptr for the null bitmap buffer */
0, /*null_count */
std::move(buffers),
null_count,
0 /*offset*/
);

Expand All @@ -74,7 +94,7 @@ std::shared_ptr<arrow::Array> rvector_to_Array(SEXP x){
// TODO: Dates, ...
return arrow::r::SimpleArray<REALSXP, arrow::DoubleType>(x);
case RAWSXP:
return arrow::r::SimpleArray<REALSXP, arrow::DoubleType>(x);
return arrow::r::SimpleArray<RAWSXP, arrow::Int8Type>(x);
default:
break;
}
Expand Down Expand Up @@ -122,11 +142,24 @@ std::shared_ptr<arrow::Array> RecordBatch_column(const std::shared_ptr<arrow::Re

template <int RTYPE>
inline SEXP simple_Array_to_Vector(const std::shared_ptr<arrow::Array>& array ){
// ignoring null buffer for now
using stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
auto start = reinterpret_cast<const stored_type*>(array->data()->buffers[1]->data());

return Rcpp::wrap(start, start + array->length());
size_t n = array->length();
Rcpp::Vector<RTYPE> vec(start, start + n);
if (array->null_count() && RTYPE != RAWSXP) {
// TODO: not sure what to do with RAWSXP since
// R raw vector do not have a concept of missing data

auto bitmap_data = array->null_bitmap()->data();
for (size_t i=0; i < n; i++) {
if (BitUtil::GetBit(bitmap_data, i)) {
vec[i] = Rcpp::Vector<RTYPE>::get_na();
}
}
}

return vec;
}

// [[Rcpp::export]]
Expand Down Expand Up @@ -164,7 +197,6 @@ inline SEXP simple_ChunkedArray_to_Vector(const std::shared_ptr<arrow::ChunkedAr
return out;
}


SEXP ChunkedArray_as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array){
switch(chunked_array->type()->id()){
case Type::INT8: return simple_ChunkedArray_to_Vector<RAWSXP>(chunked_array);
Expand Down Expand Up @@ -301,3 +333,18 @@ List Table_to_dataframe(const std::shared_ptr<arrow::Table>& table){
tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
return tbl;
}

// [[Rcpp::export]]
std::shared_ptr<arrow::Column> Table__column(const std::shared_ptr<arrow::Table>& table, int i) {
return table->column(i);
}

// [[Rcpp::export]]
int Column__length(const std::shared_ptr<arrow::Column>& column) {
return column->length();
}

// [[Rcpp::export]]
int Column__null_count(const std::shared_ptr<arrow::Column>& column) {
return column->null_count();
}

0 comments on commit ede8e44

Please sign in to comment.