Skip to content

Commit

Permalink
Revert "Revert "refactor(ext/net): clean up variadic network ops (den…
Browse files Browse the repository at this point in the history
…oland#16… (denoland#16422)

…392)" (denoland#16417)"

This reverts commit 8e3f825.
  • Loading branch information
bartlomieju authored Oct 25, 2022
1 parent 378e6a8 commit af62e08
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 726 deletions.
242 changes: 158 additions & 84 deletions ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
Error,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
Symbol,
SymbolAsyncIterator,
SymbolFor,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = window.__bootstrap.primordials;

Expand All @@ -38,30 +38,6 @@
return core.shutdown(rid);
}

function opAccept(rid, transport) {
return core.opAsync("op_net_accept", { rid, transport });
}

function opListen(args) {
return ops.op_net_listen(args);
}

function opConnect(args) {
return core.opAsync("op_net_connect", args);
}

function opReceive(rid, transport, zeroCopy) {
return core.opAsync(
"op_dgram_recv",
{ rid, transport },
zeroCopy,
);
}

function opSend(args, zeroCopy) {
return core.opAsync("op_dgram_send", args, zeroCopy);
}

function resolveDns(query, recordType, options) {
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
Expand Down Expand Up @@ -135,11 +111,6 @@

class UnixConn extends Conn {}

// Use symbols for method names to hide these in stable API.
// TODO(kt3k): Remove these symbols when ref/unref become stable.
const listenerRef = Symbol("listenerRef");
const listenerUnref = Symbol("listenerUnref");

class Listener {
#rid = 0;
#addr = null;
Expand All @@ -159,21 +130,35 @@
return this.#addr;
}

accept() {
const promise = opAccept(this.rid, this.addr.transport);
async accept() {
let promise;
switch (this.addr.transport) {
case "tcp":
promise = core.opAsync("op_net_accept_tcp", this.rid);
break;
case "unix":
promise = core.opAsync("op_net_accept_unix", this.rid);
break;
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
this.#promiseId = promise[promiseIdSymbol];
if (this.#unref) {
this.#unrefOpAccept();
if (this.#unref) core.unrefOp(this.#promiseId);
const [rid, localAddr, remoteAddr] = await promise;
this.#promiseId = null;
if (this.addr.transport == "tcp") {
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
} else {
throw new Error("unreachable");
}
return promise.then((res) => {
if (this.addr.transport == "tcp") {
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
} else {
throw new Error("unreachable");
}
});
}

async next() {
Expand Down Expand Up @@ -205,22 +190,15 @@
return this;
}

[listenerRef]() {
ref() {
this.#unref = false;
this.#refOpAccept();
}

[listenerUnref]() {
this.#unref = true;
this.#unrefOpAccept();
}

#refOpAccept() {
if (typeof this.#promiseId === "number") {
core.refOp(this.#promiseId);
}
}
#unrefOpAccept() {

unref() {
this.#unref = true;
if (typeof this.#promiseId === "number") {
core.unrefOp(this.#promiseId);
}
Expand All @@ -247,18 +225,54 @@

async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
const { size, remoteAddr } = await opReceive(
this.rid,
this.addr.transport,
buf,
);
const sub = TypedArrayPrototypeSubarray(buf, 0, size);
let nread;
let remoteAddr;
switch (this.addr.transport) {
case "udp": {
[nread, remoteAddr] = await core.opAsync(
"op_net_recv_udp",
this.rid,
buf,
);
remoteAddr.transport = "udp";
break;
}
case "unixpacket": {
let path;
[nread, path] = await core.opAsync(
"op_net_recv_unixpacket",
this.rid,
buf,
);
remoteAddr = { transport: "unixpacket", path };
break;
}
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
const sub = TypedArrayPrototypeSubarray(buf, 0, nread);
return [sub, remoteAddr];
}

send(p, addr) {
const args = { hostname: "127.0.0.1", ...addr, rid: this.rid };
return opSend(args, p);
async send(p, opts) {
switch (this.addr.transport) {
case "udp":
return await core.opAsync(
"op_net_send_udp",
this.rid,
{ hostname: opts.hostname ?? "127.0.0.1", port: opts.port },
p,
);
case "unixpacket":
return await core.opAsync(
"op_net_send_unixpacket",
this.rid,
opts.path,
p,
);
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
}

close() {
Expand All @@ -282,40 +296,100 @@
}
}

function listen({ hostname, ...options }, constructor = Listener) {
const res = opListen({
transport: "tcp",
hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
...options,
});
function listen(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
const [rid, addr] = ops.op_net_listen_tcp({
hostname: args.hostname ?? "0.0.0.0",
port: args.port,
});
addr.transport = "tcp";
return new Listener(rid, addr);
}
case "unix": {
const [rid, path] = ops.op_net_listen_unix(args.path);
const addr = {
transport: "unix",
path,
};
return new Listener(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}

return new constructor(res.rid, res.localAddr);
function listenDatagram(args) {
switch (args.transport) {
case "udp": {
const [rid, addr] = ops.op_net_listen_udp(
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
args.reuseAddress ?? false,
);
addr.transport = "udp";
return new Datagram(rid, addr);
}
case "unixpacket": {
const [rid, path] = ops.op_net_listen_unixpacket(args.path);
const addr = {
transport: "unixpacket",
path,
};
return new Datagram(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}

async function connect(options) {
if (options.transport === "unix") {
const res = await opConnect(options);
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
async function connect(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_tcp",
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
}
case "unix": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_unix",
args.path,
);
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}

const res = await opConnect({
transport: "tcp",
hostname: "127.0.0.1",
...options,
});
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
function setup(unstable) {
if (!unstable) {
delete Listener.prototype.ref;
delete Listener.prototype.unref;
}
}

window.__bootstrap.net = {
setup,
connect,
Conn,
TcpConn,
UnixConn,
opConnect,
listen,
listenerRef,
listenerUnref,
opListen,
listenDatagram,
Listener,
shutdown,
Datagram,
Expand Down
Loading

0 comments on commit af62e08

Please sign in to comment.