@@ 30,34 30,6 @@ var
errno {.importc, header: "<errno.h>".}: OSErrorCode
EINTR {.importc, header: "<errno.h>".}: cint
-template retryOnEIntr*(op: untyped): untyped =
- ## Given a POSIX operation that returns `-1` on error, automatically retry it
- ## if the error was `EINTR`.
- # Taken from nim-sys.
- var result: typeof(op)
- while true:
- result = op
-
- if cint(result) != -1: break
- let err = errno
- if err != EINTR.OSErrorCode:
- raise newOSError(err)
-
- result
-
-template asyncRetry*(op: untyped): untyped =
- # Taken from nim-sys.
- var result: typeof(op)
- while true:
- result = op
-
- if cint(result) != -1: break
- let err = uint16 errno
- if err notin {EINTR, EAGAIN, EWOULDBLOCK}:
- raise newOSError(OSErrorCode err)
-
- result
-
proc toEndpoint(family: IpAddressFamily; sa: var Sockaddr_storage; sl: SockLen): RemoteSpecifier =
case family:
of IpAddressFamily.IPv6:
@@ 97,8 69,14 @@ proc connect(sock: SocketHandle; remote: RemoteSpecifier) =
sl: SockLen
toSockAddr(remote.ip, remote.port, sa, sl)
sock.setBlocking(false)
- let n = asyncRetry:
- sock.connect(cast[ptr SockAddr](addr sa), sl)
+ var n: int
+ while true:
+ n = sock.connect(cast[ptr SockAddr](addr sa), sl)
+ if n < 0:
+ let err = uint16 errno
+ if err notin {EINTR, EAGAIN, EWOULDBLOCK}:
+ raise newOSError(OSErrorCode err)
+ else: break
if n < 0:
raise newOSError(errno)
@@ 137,7 115,6 @@ proc initiateTCP(preconn: Preconnection; conn: Connection) {.asyncio.} =
of IpAddressFamily.IPv4: Domain.AF_INET
conn.platform.socket = createNativeSocket(
domain, SockType.SOCK_STREAM, Protocol.IPPROTO_TCP)
- # conn.platform.socket.setSockOptInt(SOL_SOCKET, cint OptReuseAddr, 1)
conn.platform.socket.connect(preconn.remotes[i])
tapsEcho "Connection -> Ready"
conn.ready()
@@ 179,6 156,7 @@ proc acceptTcpConn(lis: Listener; family: IpAddressFamily; socket: SocketHandle)
if err notin {EINTR, EAGAIN, EWOULDBLOCK}:
raise newOSError(OSErrorCode err)
wait(SocketFD socket, Event.Read)
+ result.platform.socket.setBlocking(false)
result.remote = some toEndpoint(family, sa, sl)
proc acceptTcp(lis: Listener; i: int; local: LocalSpecifier) {.asyncio.} =
@@ 284,7 262,6 @@ proc listen*(conn: Connection): Listener =
proc send*(
conn: Connection; msg: pointer; msgLen: int;
ctx = MessageContext(); endOfMessage = true) =
- tapsEcho "send ", msgLen, " bytes through TAPS"
try:
if conn.transport.isTcp:
if msgLen > 0:
@@ 342,32 319,38 @@ proc receiveAsync(conn: Connection; minIncompleteLength, maxLength: int) {.async
if conn.remote.isSome:
remote = get(conn.remote)
assert(buf.len > 0)
- let bytesRead = asyncRetry:
+ var bytesRead: int
+ while true:
if connectionLess:
- conn.platform.socket.recvfrom(
+ bytesRead = conn.platform.socket.recvfrom(
buf[0].addr, buf.len, 0,
cast[ptr Sockaddr](saddr.addr), saddrLen.addr)
else:
- conn.platform.socket.recv(buf[0].addr, buf.len, 0)
- if bytesRead < 0:
- tapsEcho "Connection -> ReceiveError<messageContext, reason?>"
- conn.receiveError(ctx, newOSError(errno))
+ bytesRead = conn.platform.socket.recv(buf[0].addr, buf.len, 0)
+ if bytesRead < 0:
+ let err = uint16 errno
+ if err notin {EINTR, EAGAIN, EWOULDBLOCK}:
+ tapsEcho "Connection -> ReceiveError<messageContext, reason?>"
+ conn.receiveError(ctx, newOSError(OSErrorCode errno))
+ return
+ wait(SocketFD conn.platform.socket, Event.Read)
+ else:
+ break
+ if connectionless:
+ fromSockAddr(saddr, saddrLen, remote.ip, remote.port)
+ ctx.remote = some remote
+ if bytesRead == 0:
+ close conn.platform.socket
+ conn.closed()
+ elif bytesRead < minIncompleteLength:
+ raiseAssert "recv less than minIncompleteLength"
else:
tapsEcho "Connection -> Received<messageData, messageContext>"
- if connectionless:
- fromSockAddr(saddr, saddrLen, remote.ip, remote.port)
- ctx.remote = some remote
- if bytesRead == 0:
- close conn.platform.socket
- conn.closed()
- elif bytesRead < minIncompleteLength:
- raiseAssert "recv less than minIncompleteLength"
- else:
- buf.setLen(bytesRead)
- if conn.transport.isUdp:
- conn.received(buf, ctx)
- elif conn.transport.isTcp:
- conn.receivedPartial(buf, ctx, false)
+ buf.setLen(bytesRead)
+ if conn.transport.isUdp:
+ conn.received(buf, ctx)
+ elif conn.transport.isTcp:
+ conn.receivedPartial(buf, ctx, false)
proc receive*(conn: Connection;
minIncompleteLength = -1; maxLength = -1) =