Skip to content

Commit

Permalink
Merge pull request rapidsai#203 from dantegd/enh-ext-join-hash-api-up…
Browse files Browse the repository at this point in the history
…date

[REVIEW] Update to support hash join, gdf_context and new libgdf api
  • Loading branch information
kkraus14 authored Aug 17, 2018
2 parents a9eac7b + 97c3085 commit df369e6
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 33 deletions.
29 changes: 23 additions & 6 deletions pygdf/_gdf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2018, NVIDIA CORPORATION.

"""
This file provide binding to the libgdf library.
"""
Expand Down Expand Up @@ -134,10 +136,14 @@ def apply_sort(col_keys, col_vals, ascending=True):


_join_how_api = {
'inner': libgdf.gdf_inner_join_generic,
'inner': libgdf.gdf_inner_join,
'outer': libgdf.gdf_outer_join_generic,
'left': libgdf.gdf_multi_left_join_generic,
'left-compat': libgdf.gdf_left_join_generic,
'left': libgdf.gdf_left_join,
}

_join_method_api = {
'sort': libgdf.GDF_SORT,
'hash': libgdf.GDF_HASH
}


Expand All @@ -153,7 +159,7 @@ def _as_numba_devarray(intaddr, nelem, dtype):


@contextlib.contextmanager
def apply_join(col_lhs, col_rhs, how):
def apply_join(col_lhs, col_rhs, how, method='hash'):
"""Returns a tuple of the left and right joined indices as gpu arrays.
"""
if(len(col_lhs) != len(col_rhs)):
Expand All @@ -162,16 +168,27 @@ def apply_join(col_lhs, col_rhs, how):

joiner = _join_how_api[how]
join_result_ptr = ffi.new("gdf_join_result_type**", None)
method_api = _join_method_api[method]
gdf_context = ffi.new('gdf_context*')

if(how == 'left'):
if method == 'hash':
libgdf.gdf_context_view(gdf_context, 0, method_api, 0)
elif method == 'sort':
libgdf.gdf_context_view(gdf_context, 1, method_api, 0)
else:
msg = "method not supported"
raise ValueError(msg)

if(how in ['left', 'inner']):
list_lhs = []
list_rhs = []
for i in range(len(col_lhs)):
list_lhs.append(col_lhs[i].cffi_view)
list_rhs.append(col_rhs[i].cffi_view)

# Call libgdf
joiner(len(col_lhs), list_lhs, list_rhs, join_result_ptr)

joiner(len(col_lhs), list_lhs, list_rhs, join_result_ptr, gdf_context)
else:
joiner(col_lhs[0].cffi_view, col_rhs[0].cffi_view, join_result_ptr)

Expand Down
8 changes: 6 additions & 2 deletions pygdf/categorical.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2018, NVIDIA CORPORATION.

import pandas as pd
import numpy as np

Expand Down Expand Up @@ -208,7 +210,8 @@ def _decode(self, value):
def default_na_value(self):
return -1

def join(self, other, how='left', return_indexers=False):
def join(self, other, how='left', return_indexers=False,
method='hash'):
if not isinstance(other, CategoricalColumn):
raise TypeError('*other* is not a categorical column')
if self._ordered != other._ordered or self._ordered:
Expand All @@ -235,7 +238,8 @@ def join(self, other, how='left', return_indexers=False):
# Do join as numeric column
join_result = self.as_numerical.join(
other.as_numerical, how=how,
return_indexers=return_indexers)
return_indexers=return_indexers,
method=method)

if return_indexers:
joined_index, indexers = join_result
Expand Down
10 changes: 6 additions & 4 deletions pygdf/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ def _merge_gdf(self, left, right, left_on, right_on, how, return_indices):
return joined_indices

def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
sort=False):
sort=False, method='hash'):
"""Join columns with other DataFrame on index or on a key column.
Parameters
Expand Down Expand Up @@ -790,10 +790,11 @@ def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
'lsuffix and rsuffix are not defined')

return self._join(other=other, how=how, lsuffix=lsuffix,
rsuffix=rsuffix, sort=sort, same_names=same_names)
rsuffix=rsuffix, sort=sort, same_names=same_names,
method=method)

def _join(self, other, how, lsuffix, rsuffix, sort, same_names,
rightjoin=False):
method='hash', rightjoin=False):
if how == 'right':
# libgdf doesn't support right join directly, we will swap the
# dfs and use left join
Expand Down Expand Up @@ -823,7 +824,8 @@ def gather_empty(outdf, indf, idx, joinidx, suffix):
df = DataFrame()

joined_index, indexers = lhs.index.join(rhs.index, how=how,
return_indexers=True)
return_indexers=True,
method=method)
gather_fn = (gather_cols if len(joined_index) else gather_empty)
lidx = indexers[0].to_gpu_array()
ridx = indexers[1].to_gpu_array()
Expand Down
5 changes: 3 additions & 2 deletions pygdf/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ def __eq__(self, other):
res = lhs.unordered_compare('eq', rhs).all()
return res

def join(self, other, how='left', return_indexers=False):
def join(self, other, method, how='left', return_indexers=False):
column_join_res = self.as_column().join(
other.as_column(), how=how, return_indexers=return_indexers)
other.as_column(), how=how, return_indexers=return_indexers,
method=method)
if return_indexers:
joined_col, indexers = column_join_res
joined_index = GenericIndex(joined_col)
Expand Down
56 changes: 46 additions & 10 deletions pygdf/numerical.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,61 @@ def default_na_value(self):
raise TypeError(
"numeric column of {} has no NaN value".format(self.dtype))

def join(self, other, how='left', return_indexers=False, type='sort'):
def join(self, other, how='left', return_indexers=False, method='sort'):

# Single column join using sort-based implementation
if type == 'sort':
if method == 'sort' or how == 'outer':
return self._sortjoin(other=other, how=how,
return_indexers=return_indexers)
elif type == 'hash':
elif method == 'hash':
# Get list of columns from self with left_on and
# from other with right_on
return self._hashjoin(other=other, how=how,
return_indexers=return_indexers)
else:
raise ValueError('Unsupported join type')
raise ValueError('Unsupported join method')

def _hashjoin(self, other, how='left', return_indexers=False):
msg = "Hash based join on index not implemented yet."
raise NotImplementedError(msg)
return

from .series import Series

if not self.is_type_equivalent(other):
raise TypeError('*other* is not compatible')

with _gdf.apply_join(
[self], [other], how=how, method='hash') as (lidx, ridx):
if lidx.size > 0:
raw_index = cudautils.gather_joined_index(
self.to_gpu_array(),
other.to_gpu_array(),
lidx,
ridx,
)
buf_index = Buffer(raw_index)
else:
buf_index = Buffer.null(dtype=self.dtype)

joined_index = self.replace(data=buf_index)

if return_indexers:
def gather(idxrange, idx):
mask = (Series(idx) != -1).as_mask()
return idxrange.take(idx).set_mask(mask).fillna(-1)

if len(joined_index) > 0:
indexers = (
gather(Series(range(0, len(self))), lidx),
gather(Series(range(0, len(other))), ridx),
)
else:
indexers = (
Series(Buffer.null(dtype=np.intp)),
Series(Buffer.null(dtype=np.intp))
)
return joined_index, indexers
else:
return joined_index
# return

def _sortjoin(self, other, how='left', return_indexers=False):
"""Join with another column.
Expand All @@ -267,9 +304,8 @@ def _sortjoin(self, other, how='left', return_indexers=False):

lkey, largsort = self.sort_by_values(True)
rkey, rargsort = other.sort_by_values(True)
if how == 'left':
how = 'left-compat'
with _gdf.apply_join([lkey], [rkey], how=how) as (lidx, ridx):
with _gdf.apply_join(
[lkey], [rkey], how=how, method='sort') as (lidx, ridx):
if lidx.size > 0:
raw_index = cudautils.gather_joined_index(
lkey.to_gpu_array(),
Expand Down
44 changes: 35 additions & 9 deletions pygdf/tests/test_joining.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,56 @@ def make_params():
np.random.seed(0)

hows = 'left,inner,outer,right'.split(',')
methods = 'hash,sort'.split(',')

# Test specific cases (1)
aa = [0, 0, 4, 5, 5]
bb = [0, 0, 2, 3, 5]
for how in hows:
yield (aa, bb, how)
if how in ['left', 'inner', 'right']:
for method in methods:
yield (aa, bb, how, method)
else:
yield(aa, bb, how, 'sort')

# Test specific cases (2)
aa = [0, 0, 1, 2, 3]
bb = [0, 1, 2, 2, 3]
for how in hows:
yield (aa, bb, how)
if how in ['left', 'inner', 'right']:
for method in methods:
yield (aa, bb, how, method)
else:
yield(aa, bb, how, 'sort')

# Test large random integer inputs
aa = np.random.randint(0, 50, 100)
bb = np.random.randint(0, 50, 100)
for how in hows:
yield (aa, bb, how)
if how in ['left', 'inner', 'right']:
for method in methods:
yield (aa, bb, how, method)
else:
yield(aa, bb, how, 'sort')

# Test floating point inputs
aa = np.random.random(50)
bb = np.random.random(50)
for how in hows:
yield (aa, bb, how)
if how in ['left', 'inner', 'right']:
for method in methods:
yield (aa, bb, how, method)
else:
yield(aa, bb, how, 'sort')


@pytest.mark.parametrize('aa,bb,how', make_params())
def test_dataframe_join_how(aa, bb, how):
@pytest.mark.parametrize('aa,bb,how,method', make_params())
def test_dataframe_join_how(aa, bb, how, method):
df = DataFrame()
df['a'] = aa
df['b'] = bb

def work(df):
def work_pandas(df):
ts = timer()
df1 = df.set_index('a')
df2 = df.set_index('b')
Expand All @@ -55,8 +72,17 @@ def work(df):
print('timing', type(df), te - ts)
return joined

expect = work(df.to_pandas())
got = work(df)
def work_gdf(df):
ts = timer()
df1 = df.set_index('a')
df2 = df.set_index('b')
joined = df1.join(df2, how=how, sort=True, method=method)
te = timer()
print('timing', type(df), te - ts)
return joined

expect = work_pandas(df.to_pandas())
got = work_gdf(df)
expecto = expect.copy()
goto = got.copy()

Expand Down

0 comments on commit df369e6

Please sign in to comment.