#include "numpy/numpy_scan.h" #include "common/exception/runtime.h" #include "common/type_utils.h" #include "common/types/timestamp_t.h" #include "pandas/pandas_bind.h" #include "py_conversion.h" #include "py_str_utils.h" #include "utf8proc_wrapper.h" namespace lbug { using namespace lbug::common; template void ScanNumpyColumn(py::array& npArray, uint64_t offset, ValueVector* outputVector, uint64_t count) { auto srcData = (T*)npArray.data(); memcpy(outputVector->getData(), srcData + offset, count * sizeof(T)); } template void scanNumpyMasked(PandasColumnBindData* bindData, uint64_t count, uint64_t offset, ValueVector* outputVector) { DASSERT(bindData->pandasCol->getBackEnd() == PandasColumnBackend::NUMPY); auto& npColumn = reinterpret_cast(*bindData->pandasCol); ScanNumpyColumn(npColumn.array, offset, outputVector, count); if (bindData->mask != nullptr) { UNREACHABLE_CODE; } } template void setNullIfNan(T value, uint64_t pos, ValueVector* outputVector) { if (std::isnan(value)) { outputVector->setNull(pos, true /* isNull */); } } template void ScanNumpyFpColumn(const T* srcData, uint64_t count, uint64_t offset, ValueVector* outputVector) { memcpy(outputVector->getData(), srcData + offset, count * sizeof(T)); for (auto i = 0u; i < count; i++) { setNullIfNan(outputVector->getValue(i), i, outputVector); } } template static void appendPythonUnicode(T* codepoints, uint64_t codepointLength, ValueVector* vectorToAppend, uint64_t pos) { uint64_t utf8StrLen = 0; for (auto i = 0u; i < codepointLength; i++) { auto len = utf8proc::Utf8Proc::codepointLength(int(codepoints[i])); DASSERT(len >= 1); utf8StrLen += len; } auto& strToAppend = StringVector::reserveString(vectorToAppend, pos, utf8StrLen); // utf8proc_codepoint_to_utf8 requires that: // 1. codePointLen must be an int. // 2. dataToWrite must be a char*. int codePointLen = 0; auto dataToWrite = (char*)strToAppend.getData(); for (auto i = 0u; i < codepointLength; i++) { utf8proc::Utf8Proc::codepointToUtf8(int(codepoints[i]), codePointLen, dataToWrite); DASSERT(codePointLen >= 1); dataToWrite += codePointLen; } if (!string_t::isShortString(utf8StrLen)) { memcpy(strToAppend.prefix, strToAppend.getData(), string_t::PREFIX_LENGTH); } } void NumpyScan::scan(PandasColumnBindData* bindData, uint64_t count, uint64_t offset, common::ValueVector* outputVector) { DASSERT(bindData->pandasCol->getBackEnd() == PandasColumnBackend::NUMPY); auto& npCol = reinterpret_cast(*bindData->pandasCol); auto& array = npCol.array; switch (bindData->npType.type) { case NumpyNullableType::BOOL: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::UINT_8: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::UINT_16: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::UINT_32: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::UINT_64: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::INT_8: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::INT_16: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::INT_32: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::INT_64: scanNumpyMasked(bindData, count, offset, outputVector); break; case NumpyNullableType::FLOAT_32: ScanNumpyFpColumn(reinterpret_cast(array.data()), count, offset, outputVector); break; case NumpyNullableType::FLOAT_64: ScanNumpyFpColumn(reinterpret_cast(array.data()), count, offset, outputVector); break; case NumpyNullableType::DATETIME_S: case NumpyNullableType::DATETIME_MS: case NumpyNullableType::DATETIME_NS: case NumpyNullableType::DATETIME_US: { auto sourceData = reinterpret_cast(array.data()); auto dstData = reinterpret_cast(outputVector->getData()); for (auto i = 0u; i < count; i++) { auto pos = offset + i; dstData[i].value = sourceData[pos]; outputVector->setNull(i, false /* isNull */); } break; } case NumpyNullableType::TIMEDELTA: { auto sourceData = reinterpret_cast(array.data()); auto dstData = reinterpret_cast(outputVector->getData()); for (auto i = 0u; i < count; i++) { auto pos = offset + i; auto micro = sourceData[pos] / 1000; auto days = micro / Interval::MICROS_PER_DAY; micro = micro % Interval::MICROS_PER_DAY; auto months = days / Interval::DAYS_PER_MONTH; days = days % Interval::DAYS_PER_MONTH; interval_t interval; interval.months = months; interval.days = days; interval.micros = micro; dstData[i] = interval; outputVector->setNull(i, false /* isNull */); } break; } case NumpyNullableType::OBJECT: { auto sourceData = (PyObject**)array.data(); if (outputVector->dataType.getLogicalTypeID() != LogicalTypeID::STRING) { scanObjectColumn(sourceData, count, offset, outputVector); return; } auto dstData = reinterpret_cast(outputVector->getData()); py::gil_scoped_acquire gil; for (auto i = 0u; i < count; i++) { auto pos = i + offset; PyObject* val = sourceData[pos]; if (bindData->npType.type == NumpyNullableType::OBJECT && !py::isinstance<:str>(val)) { if (val == Py_None || (py::isinstance<:float_>(val) && std::isnan(PyFloat_AsDouble(val)))) { outputVector->setNull(pos, true /* isNull */); continue; } if (!py::isinstance<:str>(val)) { bindData->objectStrValContainer.push(std::move(py::str(val))); val = reinterpret_cast( bindData->objectStrValContainer.getLastAddedObject().ptr()); } } py::handle strHandle(val); if (!py::isinstance<:str>(strHandle)) { outputVector->setNull(i, true /* isNull */); continue; } outputVector->setNull(i, false /* isNull */); if (PyStrUtil::isPyUnicodeCompatibleAscii(strHandle)) { dstData[i] = string_t{PyStrUtil::getUnicodeStrData(strHandle), PyStrUtil::getUnicodeStrLen(strHandle)}; } else { auto unicodeObj = reinterpret_cast(val); if (unicodeObj->utf8) { dstData[i] = string_t(reinterpret_cast(unicodeObj->utf8), unicodeObj->utf8_length); } else if (PyStrUtil::isPyUnicodeCompact(unicodeObj) && !PyStrUtil::isPyUnicodeASCII(unicodeObj)) { auto kind = PyStrUtil::getPyUnicodeKind(strHandle); switch (kind) { case PyUnicode_1BYTE_KIND: appendPythonUnicode(PyStrUtil::PyUnicode1ByteData(strHandle), PyStrUtil::getUnicodeStrLen(strHandle), outputVector, i); break; case PyUnicode_2BYTE_KIND: appendPythonUnicode(PyStrUtil::PyUnicode2ByteData(strHandle), PyStrUtil::getUnicodeStrLen(strHandle), outputVector, i); break; case PyUnicode_4BYTE_KIND: appendPythonUnicode(PyStrUtil::PyUnicode4ByteData(strHandle), PyStrUtil::getUnicodeStrLen(strHandle), outputVector, i); break; default: UNREACHABLE_CODE; } } else { // LCOV_EXCL_START throw common::RuntimeException("Unsupported string format."); // LCOC_EXCL_STOP } } } break; } default: UNREACHABLE_CODE; } } void scanNumpyObject(PyObject* object, uint64_t offset, common::ValueVector* outputVector) { if (object == Py_None) { outputVector->setNull(offset, true /* isNull */); return; } outputVector->setNull(offset, false /* isNull */); transformPythonValue(outputVector, offset, object); } void NumpyScan::scanObjectColumn(PyObject** col, uint64_t count, uint64_t offset, common::ValueVector* outputVector) { py::gil_scoped_acquire gil; auto srcPtr = col + offset; for (auto i = 0u; i < count; i++) { scanNumpyObject(srcPtr[i], i, outputVector); } } } // namespace lbug