forked from rjpcomputing/luaforwindows
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCoSocket.lua
More file actions
executable file
·416 lines (348 loc) · 19 KB
/
CoSocket.lua
File metadata and controls
executable file
·416 lines (348 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
--------------------------------------------------------------------------------
---------------------- ## ##### ##### ###### -----------------------
---------------------- ## ## ## ## ## ## ## -----------------------
---------------------- ## ## ## ## ## ###### -----------------------
---------------------- ## ## ## ## ## ## -----------------------
---------------------- ###### ##### ##### ## -----------------------
---------------------- -----------------------
----------------------- Lua Object-Oriented Programming ------------------------
--------------------------------------------------------------------------------
-- Project: LOOP Class Library --
-- Release: 2.3 beta --
-- Title : Lua Socket Wrapper for Cooperative Scheduling --
-- Author : Renato Maia <[email protected]> --
--------------------------------------------------------------------------------
--[[VERBOSE]] local verbose = require("loop.thread.Scheduler").verbose
--[[VERBOSE]] verbose.groups.concurrency[#verbose.groups.concurrency+1] = "cosocket"
--[[VERBOSE]] verbose:newlevel{"cosocket"}
local ipairs = ipairs
local assert = assert
local setmetatable = setmetatable
local type = type
local next = next
local coroutine = require "coroutine"
local oo = require "loop.base"
local Wrapper = require "loop.object.Wrapper"
module("loop.thread.CoSocket", oo.class)
--------------------------------------------------------------------------------
-- Initialization Code ---------------------------------------------------------
--------------------------------------------------------------------------------
function __init(class, self, scheduler)
self = oo.rawnew(class, self)
self.readlocks = {}
self.writelocks = {}
if not self.scheduler then
self.scheduler = scheduler
end
return self
end
function __index(self, field)
return _M[field] or self.socketapi[field]
end
--------------------------------------------------------------------------------
-- Wrapping functions ----------------------------------------------------------
--------------------------------------------------------------------------------
local function wrappedsettimeout(self, timeout)
self.timeout = timeout or false
end
--------------------------------------------------------------------------------
local function wrappedconnect(self, host, port) --[[VERBOSE]] local verbose = self.cosocket.scheduler.verbose
local socket = self.__object --[[VERBOSE]] verbose:cosocket(true, "performing blocking connect")
socket:settimeout(-1)
local result, errmsg = socket:connect(host, port)
socket:settimeout(0) --[[VERBOSE]] verbose:cosocket(false, "blocking connect done")
return result, errmsg
end
--------------------------------------------------------------------------------
local function wrappedaccept(self)
local socket = self.__object
local timeout = self.timeout
local cosocket = self.cosocket
local readlocks = cosocket.readlocks
local scheduler = cosocket.scheduler --[[VERBOSE]] local verbose = scheduler.verbose
local current = scheduler:checkcurrent() --[[VERBOSE]] verbose:cosocket(true, "performing wrapped accept")
assert(socket, "bad argument #1 to `accept' (wrapped socket expected)")
assert(readlocks[socket] == nil, "attempt to read a socket in use")
local conn, errmsg = socket:accept()
if conn then --[[VERBOSE]] verbose:cosocket(false, "connection accepted without waiting")
return cosocket:wrap(conn)
elseif timeout == 0 or errmsg ~= "timeout" then --[[VERBOSE]] verbose:cosocket(false, "returning error ",errmsg," without waiting")
return nil, errmsg
end --[[VERBOSE]] verbose:cosocket(true, "waiting for results")
local sleeping = scheduler.sleeping
local reading = scheduler.reading
-- subscribing current thread for reading signal
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for read signal")
-- lock socket for reading and wait for signal until timeout
readlocks[socket] = current
scheduler:suspend(timeout) --[[VERBOSE]] verbose:cosocket(false, "wrapped accept resumed")
readlocks[socket] = nil
-- if thread is still blocked for reading then waiting timed out
if reading[socket] == current then
reading:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for read signal")
return nil, "timeout" --[[VERBOSE]] , verbose:cosocket(false, "waiting timed out")
elseif timeout then
sleeping:remove(current) --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
end --[[VERBOSE]] verbose:cosocket(false, "returing results after waiting")
return cosocket:wrap(socket:accept())
end
--------------------------------------------------------------------------------
local function wrappedreceive(self, pattern)
local socket = self.__object
local timeout = self.timeout
local readlocks = self.cosocket.readlocks
local scheduler = self.cosocket.scheduler --[[VERBOSE]] local verbose = scheduler.verbose
local current = scheduler:checkcurrent() --[[VERBOSE]] verbose:cosocket(true, "performing wrapped receive")
assert(socket, "bad argument #1 to `receive' (wrapped socket expected)")
assert(readlocks[socket] == nil, "attempt to read a socket in use")
-- get data already avaliable
local result, errmsg, partial = socket:receive(pattern)
-- check if job has completed
if not result and errmsg == "timeout" and timeout ~= 0 then --[[VERBOSE]] verbose:cosocket(true, "waiting for remaining of results")
local running = scheduler.running
local sleeping = scheduler.sleeping
local reading = scheduler.reading
-- set to be waken at timeout, if specified
if timeout and timeout > 0 then
sleeping:enqueue(current, scheduler:time() + timeout) --[[VERBOSE]] verbose:threads(current," registered for signal in ",timeout," seconds")
end
-- lock socket to avoid use by other coroutines
readlocks[socket] = true
-- block current thread on the socket
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for read signal")
-- reduce the number of required bytes
if type(pattern) == "number" then
pattern = pattern - #partial --[[VERBOSE]] verbose:cosocket("amount of required bytes reduced to ",pattern)
end
repeat
-- stop current thread
running:remove(current, self.currentkey) --[[VERBOSE]] verbose:threads(current," suspended")
coroutine.yield() --[[VERBOSE]] verbose:cosocket(false, "wrapped receive resumed")
-- check if the socket is ready
if reading[socket] == current then
reading:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for read signal")
errmsg = "timeout" --[[VERBOSE]] verbose:cosocket(false, "wrapped send timed out")
else --[[VERBOSE]] verbose:cosocket "reading more data from socket"
local newdata
result, errmsg, newdata = socket:receive(pattern)
if result then --[[VERBOSE]] verbose:cosocket "received all requested data"
result, errmsg, partial = partial..result, nil, nil --[[VERBOSE]] verbose:cosocket(false, "returning results after waiting")
else --[[VERBOSE]] verbose:cosocket "received only partial data"
partial = partial..newdata
if errmsg == "timeout" then
-- block current thread on the socket for more data
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for another read signal")
-- reduce the number of required bytes
if type(pattern) == "number" then
pattern = pattern - #newdata --[[VERBOSE]] verbose:cosocket("amount of required bytes reduced to ",pattern)
end
-- cancel error message
errmsg = nil --[[VERBOSE]] else verbose:cosocket(false, "returning error ",errmsg," after waiting")
end
end
end
until result or errmsg
-- remove from sleeping queue if it was waken because of data on socket.
if timeout and timeout > 0 and errmsg ~= "timeout" then
sleeping:remove(current) --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
end
-- unlock socket to allow use by other coroutines
readlocks[socket] = nil --[[VERBOSE]] else verbose:cosocket(false, "returning results without waiting")
end
return result, errmsg, partial
end
--------------------------------------------------------------------------------
local function wrappedsend(self, data, i, j) --[[VERBOSE]] local verbose = self.cosocket.scheduler.verbose
local socket = self.__object --[[VERBOSE]] verbose:cosocket(true, "performing wrapped send")
local timeout = self.timeout
local writelocks = self.cosocket.writelocks
local scheduler = self.cosocket.scheduler
local current = scheduler:checkcurrent()
assert(socket, "bad argument #1 to `send' (wrapped socket expected)")
assert(writelocks[socket] == nil, "attempt to write a socket in use")
-- fill buffer space already avaliable
local sent, errmsg, lastbyte = socket:send(data, i, j)
-- check if job has completed
if not sent and errmsg == "timeout" and timeout ~= 0 then --[[VERBOSE]] verbose:cosocket(true, "waiting to send remaining data")
local running = scheduler.running
local sleeping = scheduler.sleeping
local writing = scheduler.writing
-- set to be waken at timeout, if specified
if timeout and timeout > 0 then
sleeping:enqueue(current, scheduler:time() + timeout) --[[VERBOSE]] verbose:threads(current," registered for signal in ",timeout," seconds")
end
-- lock socket to avoid use by other coroutines
writelocks[socket] = true
-- block current thread on the socket
writing:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for write signal")
repeat
-- stop current thread
running:remove(current, self.currentkey) --[[VERBOSE]] verbose:threads(current," suspended")
coroutine.yield() --[[VERBOSE]] verbose:cosocket "wrapped send resumed"
-- check if the socket is ready
if writing[socket] == current then
writing:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for write signal")
errmsg = "timeout" --[[VERBOSE]] verbose:cosocket "wrapped send timed out"
else --[[VERBOSE]] verbose:cosocket "writing remaining data into socket"
sent, errmsg, lastbyte = socket:send(data, lastbyte+1, j)
if not sent and errmsg == "timeout" then
-- block current thread on the socket to write data
writing:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for another write signal")
-- cancel error message
errmsg = nil --[[VERBOSE]] elseif sent then verbose:cosocket "sent all supplied data" else verbose:cosocket("returning error ",errmsg," after waiting")
end
end
until sent or errmsg
-- remove from sleeping queue, if it was waken because of data on socket.
if timeout and timeout > 0 and errmsg ~= "timeout" then
sleeping:remove(current) --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
end
-- unlock socket to allow use by other coroutines
writelocks[socket] = nil --[[VERBOSE]] verbose:cosocket "send done after waiting" else verbose:cosocket(false, "send done without waiting")
end
return sent, errmsg, lastbyte
end
--------------------------------------------------------------------------------
-- Wrapped Socket API ----------------------------------------------------------
--------------------------------------------------------------------------------
function select(self, recvt, sendt, timeout)
local scheduler = self.scheduler --[[VERBOSE]] local verbose = scheduler.verbose
local current = scheduler:checkcurrent() --[[VERBOSE]] verbose:cosocket(true, "performing wrapped select")
if (recvt and #recvt > 0) or (sendt and #sendt > 0) then
local readlocks = self.readlocks
local writelocks = self.writelocks
-- assert that no thread is already blocked on these sockets
if recvt then
local new = {}
for index, wrapper in ipairs(recvt) do
local socket = wrapper.__object
assert(readlocks[socket] == nil, "attempt to read a socket in use")
new[index] = socket
new[socket] = wrapper
end
recvt = new
end
if sendt then
local new = {}
for index, wrapper in ipairs(sendt) do
local socket = wrapper.__object
assert(writelocks[socket] == nil, "attempt to write a socket in use")
new[index] = socket
new[socket] = wrapper
end
sendt = new
end
local readok, writeok, errmsg = scheduler.select(recvt, sendt, 0)
if
timeout ~= 0 and
errmsg == "timeout" and
next(readok) == nil and
next(writeok) == nil
then --[[VERBOSE]] verbose:cosocket(true, "waiting for ready socket selection")
local running = scheduler.running
local sleeping = scheduler.sleeping
local reading = scheduler.reading
local writing = scheduler.writing
-- block current thread on the sockets and lock them
if recvt then
for _, socket in ipairs(recvt) do
readlocks[socket] = current
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for read signal")
end
end
if sendt then
for _, socket in ipairs(sendt) do
writelocks[socket] = current
writing:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for write signal")
end
end
-- set to be waken at timeout, if specified
if timeout and timeout > 0 then
sleeping:enqueue(current, scheduler:time() + timeout) --[[VERBOSE]] verbose:threads(current," registered for signal in ",timeout," seconds")
end
-- stop current thread
running:remove(current, self.currentkey) --[[VERBOSE]] verbose:threads(current," suspended")
coroutine.yield() --[[VERBOSE]] verbose:cosocket(false, "wrapped select resumed")
-- remove from sleeping queue, if it was waken because of data on socket.
if timeout and timeout > 0 then
if sleeping:remove(current)
then errmsg = nil --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
else errmsg = "timeout" --[[VERBOSE]] verbose:cosocket "wrapped select timed out"
end
end
-- check which sockets are ready and remove block for other sockets
if recvt then
for _, socket in ipairs(recvt) do
readlocks[socket] = nil
if reading[socket] == current then
reading:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for read signal")
else
local wrapper = recvt[socket]
readok[#readok+1] = wrapper
readok[wrapper] = true
end
end
end
if sendt then
for _, socket in ipairs(sendt) do
writelocks[socket] = nil
if writing[socket] == current then
writing:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for write signal")
else
local wrapper = sendt[socket]
writeok[#writeok+1] = wrapper
writeok[wrapper] = true
end
end
end
else
for index, socket in ipairs(readok) do
local wrapper = recvt[socket]
readok[index] = wrapper
readok[socket] = nil
readok[wrapper] = true
end
for index, socket in ipairs(writeok) do
local wrapper = sendt[socket]
writeok[index] = wrapper
writeok[socket] = nil
writeok[wrapper] = true
end
end --[[VERBOSE]] verbose:cosocket(false, "returning selected sockets after waiting")
return readok, writeok, errmsg
else --[[VERBOSE]] verbose:cosocket(false, "no sockets for selection")
return {}, {}
end
end
function sleep(self, timeout)
assert(timeout, "bad argument #1 to `sleep' (number expected)")
return self.scheduler:suspend(timeout)
end
function tcp(self)
return self:wrap(self.socketapi.tcp())
end
function udp(self)
return self:wrap(self.socketapi.udp())
end
function connect(self, address, port)
return self:wrap(self.socketapi.connect(address, port))
end
function bind(self, address, port)
return self:wrap(self.socketapi.bind(address, port))
end
function wrap(self, socket, ...) --[[VERBOSE]] self.scheduler.verbose:cosocket "new wrapped socket"
if socket then
socket:settimeout(0)
socket = Wrapper {
__object = socket,
cosocket = self,
timeout = false,
settimeout = wrappedsettimeout,
connect = wrappedconnect,
accept = wrappedaccept,
send = wrappedsend,
receive = wrappedreceive,
}
end
return socket, ...
end