#include "numpy/numpy_scan.h"
#include "common/exception/runtime.h"
#include "common/type_utils.h"
#include "common/types/timestamp_t.h"
#include "pandas/pandas_bind.h"
#include "py_conversion.h"
#include "py_str_utils.h"
#include "utf8proc_wrapper.h"
namespace lbug {
using namespace lbug::common;
template
void ScanNumpyColumn(py::array& npArray, uint64_t offset, ValueVector* outputVector,
uint64_t count) {
auto srcData = (T*)npArray.data();
memcpy(outputVector->getData(), srcData + offset, count * sizeof(T));
}
template
void scanNumpyMasked(PandasColumnBindData* bindData, uint64_t count, uint64_t offset,
ValueVector* outputVector) {
DASSERT(bindData->pandasCol->getBackEnd() == PandasColumnBackend::NUMPY);
auto& npColumn = reinterpret_cast(*bindData->pandasCol);
ScanNumpyColumn(npColumn.array, offset, outputVector, count);
if (bindData->mask != nullptr) {
UNREACHABLE_CODE;
}
}
template
void setNullIfNan(T value, uint64_t pos, ValueVector* outputVector) {
if (std::isnan(value)) {
outputVector->setNull(pos, true /* isNull */);
}
}
template
void ScanNumpyFpColumn(const T* srcData, uint64_t count, uint64_t offset,
ValueVector* outputVector) {
memcpy(outputVector->getData(), srcData + offset, count * sizeof(T));
for (auto i = 0u; i < count; i++) {
setNullIfNan(outputVector->getValue(i), i, outputVector);
}
}
template
static void appendPythonUnicode(T* codepoints, uint64_t codepointLength,
ValueVector* vectorToAppend, uint64_t pos) {
uint64_t utf8StrLen = 0;
for (auto i = 0u; i < codepointLength; i++) {
auto len = utf8proc::Utf8Proc::codepointLength(int(codepoints[i]));
DASSERT(len >= 1);
utf8StrLen += len;
}
auto& strToAppend = StringVector::reserveString(vectorToAppend, pos, utf8StrLen);
// utf8proc_codepoint_to_utf8 requires that:
// 1. codePointLen must be an int.
// 2. dataToWrite must be a char*.
int codePointLen = 0;
auto dataToWrite = (char*)strToAppend.getData();
for (auto i = 0u; i < codepointLength; i++) {
utf8proc::Utf8Proc::codepointToUtf8(int(codepoints[i]), codePointLen, dataToWrite);
DASSERT(codePointLen >= 1);
dataToWrite += codePointLen;
}
if (!string_t::isShortString(utf8StrLen)) {
memcpy(strToAppend.prefix, strToAppend.getData(), string_t::PREFIX_LENGTH);
}
}
void NumpyScan::scan(PandasColumnBindData* bindData, uint64_t count, uint64_t offset,
common::ValueVector* outputVector) {
DASSERT(bindData->pandasCol->getBackEnd() == PandasColumnBackend::NUMPY);
auto& npCol = reinterpret_cast(*bindData->pandasCol);
auto& array = npCol.array;
switch (bindData->npType.type) {
case NumpyNullableType::BOOL:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::UINT_8:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::UINT_16:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::UINT_32:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::UINT_64:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::INT_8:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::INT_16:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::INT_32:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::INT_64:
scanNumpyMasked(bindData, count, offset, outputVector);
break;
case NumpyNullableType::FLOAT_32:
ScanNumpyFpColumn(reinterpret_cast(array.data()), count, offset,
outputVector);
break;
case NumpyNullableType::FLOAT_64:
ScanNumpyFpColumn(reinterpret_cast(array.data()), count, offset,
outputVector);
break;
case NumpyNullableType::DATETIME_S:
case NumpyNullableType::DATETIME_MS:
case NumpyNullableType::DATETIME_NS:
case NumpyNullableType::DATETIME_US: {
auto sourceData = reinterpret_cast(array.data());
auto dstData = reinterpret_cast(outputVector->getData());
for (auto i = 0u; i < count; i++) {
auto pos = offset + i;
dstData[i].value = sourceData[pos];
outputVector->setNull(i, false /* isNull */);
}
break;
}
case NumpyNullableType::TIMEDELTA: {
auto sourceData = reinterpret_cast(array.data());
auto dstData = reinterpret_cast(outputVector->getData());
for (auto i = 0u; i < count; i++) {
auto pos = offset + i;
auto micro = sourceData[pos] / 1000;
auto days = micro / Interval::MICROS_PER_DAY;
micro = micro % Interval::MICROS_PER_DAY;
auto months = days / Interval::DAYS_PER_MONTH;
days = days % Interval::DAYS_PER_MONTH;
interval_t interval;
interval.months = months;
interval.days = days;
interval.micros = micro;
dstData[i] = interval;
outputVector->setNull(i, false /* isNull */);
}
break;
}
case NumpyNullableType::OBJECT: {
auto sourceData = (PyObject**)array.data();
if (outputVector->dataType.getLogicalTypeID() != LogicalTypeID::STRING) {
scanObjectColumn(sourceData, count, offset, outputVector);
return;
}
auto dstData = reinterpret_cast(outputVector->getData());
py::gil_scoped_acquire gil;
for (auto i = 0u; i < count; i++) {
auto pos = i + offset;
PyObject* val = sourceData[pos];
if (bindData->npType.type == NumpyNullableType::OBJECT &&
!py::isinstance<:str>(val)) {
if (val == Py_None ||
(py::isinstance<:float_>(val) && std::isnan(PyFloat_AsDouble(val)))) {
outputVector->setNull(pos, true /* isNull */);
continue;
}
if (!py::isinstance<:str>(val)) {
bindData->objectStrValContainer.push(std::move(py::str(val)));
val = reinterpret_cast(
bindData->objectStrValContainer.getLastAddedObject().ptr());
}
}
py::handle strHandle(val);
if (!py::isinstance<:str>(strHandle)) {
outputVector->setNull(i, true /* isNull */);
continue;
}
outputVector->setNull(i, false /* isNull */);
if (PyStrUtil::isPyUnicodeCompatibleAscii(strHandle)) {
dstData[i] = string_t{PyStrUtil::getUnicodeStrData(strHandle),
PyStrUtil::getUnicodeStrLen(strHandle)};
} else {
auto unicodeObj = reinterpret_cast(val);
if (unicodeObj->utf8) {
dstData[i] = string_t(reinterpret_cast(unicodeObj->utf8),
unicodeObj->utf8_length);
} else if (PyStrUtil::isPyUnicodeCompact(unicodeObj) &&
!PyStrUtil::isPyUnicodeASCII(unicodeObj)) {
auto kind = PyStrUtil::getPyUnicodeKind(strHandle);
switch (kind) {
case PyUnicode_1BYTE_KIND:
appendPythonUnicode(PyStrUtil::PyUnicode1ByteData(strHandle),
PyStrUtil::getUnicodeStrLen(strHandle), outputVector, i);
break;
case PyUnicode_2BYTE_KIND:
appendPythonUnicode(PyStrUtil::PyUnicode2ByteData(strHandle),
PyStrUtil::getUnicodeStrLen(strHandle), outputVector, i);
break;
case PyUnicode_4BYTE_KIND:
appendPythonUnicode(PyStrUtil::PyUnicode4ByteData(strHandle),
PyStrUtil::getUnicodeStrLen(strHandle), outputVector, i);
break;
default:
UNREACHABLE_CODE;
}
} else {
// LCOV_EXCL_START
throw common::RuntimeException("Unsupported string format.");
// LCOC_EXCL_STOP
}
}
}
break;
}
default:
UNREACHABLE_CODE;
}
}
void scanNumpyObject(PyObject* object, uint64_t offset, common::ValueVector* outputVector) {
if (object == Py_None) {
outputVector->setNull(offset, true /* isNull */);
return;
}
outputVector->setNull(offset, false /* isNull */);
transformPythonValue(outputVector, offset, object);
}
void NumpyScan::scanObjectColumn(PyObject** col, uint64_t count, uint64_t offset,
common::ValueVector* outputVector) {
py::gil_scoped_acquire gil;
auto srcPtr = col + offset;
for (auto i = 0u; i < count; i++) {
scanNumpyObject(srcPtr[i], i, outputVector);
}
}
} // namespace lbug