Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "refactor(ext/net): clean up variadic network ops (#16… #16422

Merged
merged 1 commit into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 158 additions & 84 deletions ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
Error,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
Symbol,
SymbolAsyncIterator,
SymbolFor,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = window.__bootstrap.primordials;

Expand All @@ -38,30 +38,6 @@
return core.shutdown(rid);
}

function opAccept(rid, transport) {
return core.opAsync("op_net_accept", { rid, transport });
}

function opListen(args) {
return ops.op_net_listen(args);
}

function opConnect(args) {
return core.opAsync("op_net_connect", args);
}

function opReceive(rid, transport, zeroCopy) {
return core.opAsync(
"op_dgram_recv",
{ rid, transport },
zeroCopy,
);
}

function opSend(args, zeroCopy) {
return core.opAsync("op_dgram_send", args, zeroCopy);
}

function resolveDns(query, recordType, options) {
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
Expand Down Expand Up @@ -135,11 +111,6 @@

class UnixConn extends Conn {}

// Use symbols for method names to hide these in stable API.
// TODO(kt3k): Remove these symbols when ref/unref become stable.
const listenerRef = Symbol("listenerRef");
const listenerUnref = Symbol("listenerUnref");

class Listener {
#rid = 0;
#addr = null;
Expand All @@ -159,21 +130,35 @@
return this.#addr;
}

accept() {
const promise = opAccept(this.rid, this.addr.transport);
async accept() {
let promise;
switch (this.addr.transport) {
case "tcp":
promise = core.opAsync("op_net_accept_tcp", this.rid);
break;
case "unix":
promise = core.opAsync("op_net_accept_unix", this.rid);
break;
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
this.#promiseId = promise[promiseIdSymbol];
if (this.#unref) {
this.#unrefOpAccept();
if (this.#unref) core.unrefOp(this.#promiseId);
const [rid, localAddr, remoteAddr] = await promise;
this.#promiseId = null;
if (this.addr.transport == "tcp") {
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
} else {
throw new Error("unreachable");
}
return promise.then((res) => {
if (this.addr.transport == "tcp") {
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
} else {
throw new Error("unreachable");
}
});
}

async next() {
Expand Down Expand Up @@ -205,22 +190,15 @@
return this;
}

[listenerRef]() {
ref() {
this.#unref = false;
this.#refOpAccept();
}

[listenerUnref]() {
this.#unref = true;
this.#unrefOpAccept();
}

#refOpAccept() {
if (typeof this.#promiseId === "number") {
core.refOp(this.#promiseId);
}
}
#unrefOpAccept() {

unref() {
this.#unref = true;
if (typeof this.#promiseId === "number") {
core.unrefOp(this.#promiseId);
}
Expand All @@ -247,18 +225,54 @@

async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
const { size, remoteAddr } = await opReceive(
this.rid,
this.addr.transport,
buf,
);
const sub = TypedArrayPrototypeSubarray(buf, 0, size);
let nread;
let remoteAddr;
switch (this.addr.transport) {
case "udp": {
[nread, remoteAddr] = await core.opAsync(
"op_net_recv_udp",
this.rid,
buf,
);
remoteAddr.transport = "udp";
break;
}
case "unixpacket": {
let path;
[nread, path] = await core.opAsync(
"op_net_recv_unixpacket",
this.rid,
buf,
);
remoteAddr = { transport: "unixpacket", path };
break;
}
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
const sub = TypedArrayPrototypeSubarray(buf, 0, nread);
return [sub, remoteAddr];
}

send(p, addr) {
const args = { hostname: "127.0.0.1", ...addr, rid: this.rid };
return opSend(args, p);
async send(p, opts) {
switch (this.addr.transport) {
case "udp":
return await core.opAsync(
"op_net_send_udp",
this.rid,
{ hostname: opts.hostname ?? "127.0.0.1", port: opts.port },
p,
);
case "unixpacket":
return await core.opAsync(
"op_net_send_unixpacket",
this.rid,
opts.path,
p,
);
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
}

close() {
Expand All @@ -282,40 +296,100 @@
}
}

function listen({ hostname, ...options }, constructor = Listener) {
const res = opListen({
transport: "tcp",
hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
...options,
});
function listen(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
const [rid, addr] = ops.op_net_listen_tcp({
hostname: args.hostname ?? "0.0.0.0",
port: args.port,
});
addr.transport = "tcp";
return new Listener(rid, addr);
}
case "unix": {
const [rid, path] = ops.op_net_listen_unix(args.path);
const addr = {
transport: "unix",
path,
};
return new Listener(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}

return new constructor(res.rid, res.localAddr);
function listenDatagram(args) {
switch (args.transport) {
case "udp": {
const [rid, addr] = ops.op_net_listen_udp(
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
args.reuseAddress ?? false,
);
addr.transport = "udp";
return new Datagram(rid, addr);
}
case "unixpacket": {
const [rid, path] = ops.op_net_listen_unixpacket(args.path);
const addr = {
transport: "unixpacket",
path,
};
return new Datagram(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}

async function connect(options) {
if (options.transport === "unix") {
const res = await opConnect(options);
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
async function connect(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_tcp",
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
}
case "unix": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_unix",
args.path,
);
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}

const res = await opConnect({
transport: "tcp",
hostname: "127.0.0.1",
...options,
});
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
function setup(unstable) {
if (!unstable) {
delete Listener.prototype.ref;
delete Listener.prototype.unref;
}
}

window.__bootstrap.net = {
setup,
connect,
Conn,
TcpConn,
UnixConn,
opConnect,
listen,
listenerRef,
listenerUnref,
opListen,
listenDatagram,
Listener,
shutdown,
Datagram,
Expand Down
Loading