Skip to content

Commit 8a1649a

Browse files
authored
ARROW-17063: [GLib] Add examples to send/receive record batches via network (#13590)
Authored-by: Sutou Kouhei <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
1 parent b71e503 commit 8a1649a

File tree

4 files changed

+408
-1
lines changed

4 files changed

+408
-1
lines changed

c_glib/arrow-glib/field.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ garrow_field_import(gpointer c_abi_schema, GError **error)
160160
/**
161161
* garrow_field_new:
162162
* @name: The name of the field.
163-
* @data_type: The data type of the field.
163+
* @data_type: (transfer full): The data type of the field.
164164
*
165165
* Returns: A newly created #GArrowField.
166166
*/

c_glib/example/meson.build

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,20 @@ executable('read-file', 'read-file.c',
2929
executable('read-stream', 'read-stream.c',
3030
dependencies: [arrow_glib],
3131
link_language: 'c')
32+
executable('receive-network', 'receive-network.c',
33+
dependencies: [arrow_glib],
34+
link_language: 'c')
35+
executable('send-network', 'send-network.c',
36+
dependencies: [arrow_glib],
37+
link_language: 'c')
3238

3339
install_data('README.md',
3440
'build.c',
3541
'extension-type.c',
3642
'read-file.c',
3743
'read-stream.c',
44+
'receive-network.c',
45+
'send-network.c',
3846
install_dir: join_paths(data_dir, meson.project_name(), 'example'))
3947

4048
subdir('lua')

c_glib/example/receive-network.c

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <stdlib.h>
21+
22+
#include <arrow-glib/arrow-glib.h>
23+
24+
#ifdef G_OS_UNIX
25+
# include <glib-unix.h>
26+
# include <signal.h>
27+
#endif
28+
29+
static void
30+
service_event(GSocketListener *listener,
31+
GSocketListenerEvent event,
32+
GSocket *socket,
33+
gpointer user_data)
34+
{
35+
if (event != G_SOCKET_LISTENER_BOUND) {
36+
return;
37+
}
38+
39+
GError *error = NULL;
40+
GSocketAddress* local_address = g_socket_get_local_address(socket, &error);
41+
if (!local_address) {
42+
g_print("failed to get local address: %s\n", error->message);
43+
g_error_free(error);
44+
g_object_unref(socket);
45+
return;
46+
}
47+
gchar *local_address_string =
48+
g_socket_connectable_to_string(G_SOCKET_CONNECTABLE(local_address));
49+
g_print("address: %s\n", local_address_string);
50+
g_free(local_address_string);
51+
g_object_unref(local_address);
52+
}
53+
54+
static void
55+
print_array(GArrowArray *array)
56+
{
57+
GArrowType value_type;
58+
gint64 i, n;
59+
60+
value_type = garrow_array_get_value_type(array);
61+
62+
g_print("[");
63+
n = garrow_array_get_length(array);
64+
65+
#define ARRAY_CASE(type, Type, TYPE, format) \
66+
case GARROW_TYPE_ ## TYPE: \
67+
{ \
68+
GArrow ## Type ## Array *real_array; \
69+
real_array = GARROW_ ## TYPE ## _ARRAY(array); \
70+
for (i = 0; i < n; i++) { \
71+
if (i > 0) { \
72+
g_print(", "); \
73+
} \
74+
g_print(format, \
75+
garrow_ ## type ## _array_get_value(real_array, i)); \
76+
} \
77+
} \
78+
break
79+
80+
switch (value_type) {
81+
ARRAY_CASE(uint8, UInt8, UINT8, "%hhu");
82+
ARRAY_CASE(uint16, UInt16, UINT16, "%" G_GUINT16_FORMAT);
83+
ARRAY_CASE(uint32, UInt32, UINT32, "%" G_GUINT32_FORMAT);
84+
ARRAY_CASE(uint64, UInt64, UINT64, "%" G_GUINT64_FORMAT);
85+
ARRAY_CASE( int8, Int8, INT8, "%hhd");
86+
ARRAY_CASE( int16, Int16, INT16, "%" G_GINT16_FORMAT);
87+
ARRAY_CASE( int32, Int32, INT32, "%" G_GINT32_FORMAT);
88+
ARRAY_CASE( int64, Int64, INT64, "%" G_GINT64_FORMAT);
89+
ARRAY_CASE( float, Float, FLOAT, "%g");
90+
ARRAY_CASE(double, Double, DOUBLE, "%g");
91+
default:
92+
break;
93+
}
94+
#undef ARRAY_CASE
95+
96+
g_print("]\n");
97+
}
98+
99+
static void
100+
print_record_batch(GArrowRecordBatch *record_batch)
101+
{
102+
guint nth_column, n_columns;
103+
104+
n_columns = garrow_record_batch_get_n_columns(record_batch);
105+
for (nth_column = 0; nth_column < n_columns; nth_column++) {
106+
GArrowArray *array;
107+
108+
g_print("columns[%u](%s): ",
109+
nth_column,
110+
garrow_record_batch_get_column_name(record_batch, nth_column));
111+
array = garrow_record_batch_get_column_data(record_batch, nth_column);
112+
print_array(array);
113+
g_object_unref(array);
114+
}
115+
}
116+
117+
static gboolean
118+
service_incoming(GSocketService *service,
119+
GSocketConnection *connection,
120+
GObject *source_object,
121+
gpointer user_data)
122+
{
123+
GArrowGIOInputStream *input =
124+
garrow_gio_input_stream_new(
125+
g_io_stream_get_input_stream(G_IO_STREAM(connection)));
126+
GError *error = NULL;
127+
GArrowRecordBatchStreamReader *reader =
128+
garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(input), &error);
129+
if (!reader) {
130+
g_print("failed to create reader: %s\n", error->message);
131+
g_error_free(error);
132+
g_object_unref(input);
133+
return FALSE;
134+
}
135+
136+
while (TRUE) {
137+
GArrowRecordBatch *record_batch =
138+
garrow_record_batch_reader_read_next(GARROW_RECORD_BATCH_READER(reader),
139+
&error);
140+
if (error) {
141+
g_print("failed to read the next record batch: %s\n", error->message);
142+
g_error_free(error);
143+
g_object_unref(reader);
144+
g_object_unref(input);
145+
return EXIT_FAILURE;
146+
}
147+
148+
if (!record_batch) {
149+
break;
150+
}
151+
152+
print_record_batch(record_batch);
153+
g_object_unref(record_batch);
154+
}
155+
156+
g_object_unref(reader);
157+
g_object_unref(input);
158+
159+
return FALSE;
160+
}
161+
162+
#ifdef G_OS_UNIX
163+
typedef struct {
164+
GSocketService *service;
165+
GMainLoop *loop;
166+
} StopData;
167+
168+
static gboolean
169+
stop(gpointer user_data)
170+
{
171+
StopData* data = user_data;
172+
g_object_unref(data->service);
173+
g_main_loop_quit(data->loop);
174+
return G_SOURCE_REMOVE;
175+
}
176+
#endif
177+
178+
int
179+
main(int argc, char **argv)
180+
{
181+
GSocketService *service = g_threaded_socket_service_new(-1);
182+
g_signal_connect(service, "event", G_CALLBACK(service_event), NULL);
183+
g_signal_connect(service, "incoming", G_CALLBACK(service_incoming), NULL);
184+
185+
GError *error = NULL;
186+
gboolean success =
187+
g_socket_listener_add_any_inet_port(G_SOCKET_LISTENER(service),
188+
NULL,
189+
&error);
190+
if (!success) {
191+
g_print("failed to add a listen IP address: %s\n", error->message);
192+
g_error_free(error);
193+
return EXIT_FAILURE;
194+
}
195+
196+
g_socket_service_start(service);
197+
198+
GMainLoop *loop = g_main_loop_new(NULL, FALSE);
199+
#ifdef G_OS_UNIX
200+
StopData data;
201+
data.service = service;
202+
data.loop = loop;
203+
g_unix_signal_add(SIGINT, stop, &data);
204+
g_unix_signal_add(SIGTERM, stop, &data);
205+
#else
206+
/* TODO: Implement graceful stop. */
207+
#endif
208+
g_main_loop_run(loop);
209+
g_main_loop_unref(loop);
210+
211+
return EXIT_SUCCESS;
212+
}

0 commit comments

Comments
 (0)