Skip to content

Commit

Permalink
read_arrow and write_arrow now relate to arrow::Table.
Browse files Browse the repository at this point in the history
+ read_table, write_table
  • Loading branch information
romainfrancois committed Sep 24, 2018
1 parent 110b00d commit 908c2ac
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 11 deletions.
2 changes: 2 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ S3method("==","arrow::Array")
S3method("==","arrow::DataType")
S3method("==","arrow::Field")
S3method(as_tibble,"arrow::RecordBatch")
S3method(as_tibble,"arrow::Table")
S3method(length,"arrow::Array")
S3method(print,"arrow-enum")
export(DateUnit)
Expand All @@ -31,6 +32,7 @@ export(list_of)
export(null)
export(read_arrow)
export(read_record_batch)
export(read_table)
export(record_batch)
export(schema)
export(struct)
Expand Down
12 changes: 12 additions & 0 deletions r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ read_record_batch_ <- function(path) {
.Call(`_arrow_read_record_batch_`, path)
}

Table_to_file <- function(table, path) {
.Call(`_arrow_Table_to_file`, table, path)
}

read_table_ <- function(path) {
.Call(`_arrow_read_table_`, path)
}

Table_to_dataframe <- function(table) {
.Call(`_arrow_Table_to_dataframe`, table)
}

Field_initialize <- function(name, type, nullable = TRUE) {
.Call(`_arrow_Field_initialize`, name, type, nullable)
}
Expand Down
35 changes: 24 additions & 11 deletions r/R/array.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ array <- function(...){
RecordBatch_to_dataframe(x)
}

#' @export
write_arrow <- function(data, path){
record_batch(data)$to_file(path)
}

#' @export
read_arrow <- function(path){
as_tibble(read_record_batch(path))
}

#' Create an arrow::RecordBatch from a data frame
#'
#' @param .data a data frame
Expand All @@ -106,7 +96,8 @@ read_record_batch <- function(path){
public = list(
num_columns = function() Table_num_columns(self),
num_rows = function() Table_num_rows(self),
schema = function() `arrow::Schema`$new(Table_schema(self))
schema = function() `arrow::Schema`$new(Table_schema(self)),
to_file = function(path) invisible(Table_to_file(self, fs::path_abs(path)))
)
)

Expand All @@ -118,3 +109,25 @@ read_record_batch <- function(path){
table <- function(.data){
`arrow::Table`$new(dataframe_to_Table(.data))
}

#' @export
write_arrow <- function(data, path){
table(data)$to_file(path)
}

#' @export
read_table <- function(path){
`arrow::Table`$new(read_table_(fs::path_abs(path)))
}

#' @export
`as_tibble.arrow::Table` <- function(x, ...){
Table_to_dataframe(x)
}

#' @export
read_arrow <- function(path){
as_tibble(read_table(path))
}


37 changes: 37 additions & 0 deletions r/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,40 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// Table_to_file
int Table_to_file(const std::shared_ptr<arrow::Table>& table, std::string path);
RcppExport SEXP _arrow_Table_to_file(SEXP tableSEXP, SEXP pathSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& >::type table(tableSEXP);
Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
rcpp_result_gen = Rcpp::wrap(Table_to_file(table, path));
return rcpp_result_gen;
END_RCPP
}
// read_table_
std::shared_ptr<arrow::Table> read_table_(std::string path);
RcppExport SEXP _arrow_read_table_(SEXP pathSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
rcpp_result_gen = Rcpp::wrap(read_table_(path));
return rcpp_result_gen;
END_RCPP
}
// Table_to_dataframe
List Table_to_dataframe(const std::shared_ptr<arrow::Table>& table);
RcppExport SEXP _arrow_Table_to_dataframe(SEXP tableSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& >::type table(tableSEXP);
rcpp_result_gen = Rcpp::wrap(Table_to_dataframe(table));
return rcpp_result_gen;
END_RCPP
}
// Field_initialize
std::shared_ptr<arrow::Field> Field_initialize(const std::string& name, const std::shared_ptr<arrow::DataType>& type, bool nullable);
RcppExport SEXP _arrow_Field_initialize(SEXP nameSEXP, SEXP typeSEXP, SEXP nullableSEXP) {
Expand Down Expand Up @@ -922,6 +956,9 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_Table_schema", (DL_FUNC) &_arrow_Table_schema, 1},
{"_arrow_RecordBatch_to_file", (DL_FUNC) &_arrow_RecordBatch_to_file, 2},
{"_arrow_read_record_batch_", (DL_FUNC) &_arrow_read_record_batch_, 1},
{"_arrow_Table_to_file", (DL_FUNC) &_arrow_Table_to_file, 2},
{"_arrow_read_table_", (DL_FUNC) &_arrow_read_table_, 1},
{"_arrow_Table_to_dataframe", (DL_FUNC) &_arrow_Table_to_dataframe, 1},
{"_arrow_Field_initialize", (DL_FUNC) &_arrow_Field_initialize, 3},
{"_arrow_Field_ToString", (DL_FUNC) &_arrow_Field_ToString, 1},
{"_arrow_Field_name", (DL_FUNC) &_arrow_Field_name, 1},
Expand Down
88 changes: 88 additions & 0 deletions r/src/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,41 @@ SEXP Array_as_vector(const std::shared_ptr<arrow::Array>& array){
return R_NilValue;
}

template <int RTYPE>
inline SEXP simple_ChunkedArray_to_Vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array){
using stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
Rcpp::Vector<RTYPE> out = no_init(chunked_array->length());
auto p = out.begin();

for (int i=0; i<chunked_array->num_chunks(); i++) {
auto chunk = chunked_array->chunk(i);

// copy the data
p = std::copy_n(
reinterpret_cast<const stored_type*>(chunk->data()->buffers[1]->data()),
chunk->length(),
p
);

// set NA using the bitmap, TODO
}
return out;
}


SEXP ChunkedArray_as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array){
switch(chunked_array->type()->id()){
case Type::INT8: return simple_ChunkedArray_to_Vector<RAWSXP>(chunked_array);
case Type::INT32: return simple_ChunkedArray_to_Vector<INTSXP>(chunked_array);
case Type::DOUBLE: return simple_ChunkedArray_to_Vector<REALSXP>(chunked_array);
default:
break;
}

stop(tfm::format("cannot handle Array of type %d", chunked_array->type()->id()));
return R_NilValue;
}

// [[Rcpp::export]]
List RecordBatch_to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch){
int nc = batch->num_columns();
Expand Down Expand Up @@ -213,3 +248,56 @@ std::shared_ptr<arrow::RecordBatch> read_record_batch_(std::string path) {
R_ERROR_NOT_OK(stream->Close());
return batch;
}

// [[Rcpp::export]]
int Table_to_file(const std::shared_ptr<arrow::Table>& table, std::string path) {
std::shared_ptr<arrow::io::OutputStream> stream;
std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;

R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileWriter::Open(stream.get(), table->schema(), &file_writer));
R_ERROR_NOT_OK(file_writer->WriteTable(*table));
R_ERROR_NOT_OK(file_writer->Close());

int64_t offset;
R_ERROR_NOT_OK(stream->Tell(&offset));
R_ERROR_NOT_OK(stream->Close());
return offset;
}

// [[Rcpp::export]]
std::shared_ptr<arrow::Table> read_table_(std::string path) {
std::shared_ptr<arrow::io::ReadableFile> stream;
std::shared_ptr<arrow::ipc::RecordBatchFileReader> rbf_reader;

R_ERROR_NOT_OK(arrow::io::ReadableFile::Open(path, &stream));
R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(stream, &rbf_reader));

int num_batches = rbf_reader->num_record_batches();
std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
for (int i=0; i<num_batches; i++) {
R_ERROR_NOT_OK(rbf_reader->ReadRecordBatch(i, &batches[i]));
}

std::shared_ptr<arrow::Table> table;
R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table)) ;
R_ERROR_NOT_OK(stream->Close());
return table;
}

// [[Rcpp::export]]
List Table_to_dataframe(const std::shared_ptr<arrow::Table>& table){
int nc = table->num_columns();
int nr = table->num_rows();
List tbl(nc);
CharacterVector names(nc);
for(int i=0; i<nc; i++) {
auto column = table->column(i);
tbl[i] = ChunkedArray_as_vector(column->data());
names[i] = column->name();
}
tbl.attr("names") = names;
tbl.attr("class") = CharacterVector::create("tbf_df", "tbl", "data.frame");
tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
return tbl;
}

0 comments on commit 908c2ac

Please sign in to comment.