#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
+#include <net/tcp.h>
#include <asm/uaccess.h>
static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct rpc_timeout *);
static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
-static void xprt_remove_pending(struct rpc_xprt *);
#ifdef RPC_DEBUG_DATA
/*
return result;
}
-/*
- * Read data from socket
- */
-static int
-xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
-{
- struct socket *sock = xprt->sock;
- struct msghdr msg;
- mm_segment_t oldfs;
- struct iovec niv[MAX_IOVEC];
- int result;
-
- if (!sock)
- return -ENOTCONN;
-
- msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
- msg.msg_iov = iov;
- msg.msg_iovlen = nr;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- /* Adjust the iovec if we've already filled it */
- if (shift)
- xprt_move_iov(&msg, niv, shift);
-
- oldfs = get_fs(); set_fs(get_ds());
- result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
- set_fs(oldfs);
-
- dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
- iov, len, result);
- return result;
-}
-
-
/*
* Adjust RPC congestion window
* We use a time-smoothed congestion estimator to avoid heavy oscillation.
{
dprintk("RPC: disconnected transport %p\n", xprt);
xprt_clear_connected(xprt);
- xprt_remove_pending(xprt);
rpc_wake_up_status(&xprt->pending, -ENOTCONN);
}
xprt->tcp_offset = 0;
xprt->tcp_reclen = 0;
xprt->tcp_copied = 0;
- xprt->tcp_more = 0;
+ xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
/* Now connect it asynchronously. */
dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
wake_up_interruptible(sk->sleep);
}
+typedef struct {
+ struct sk_buff *skb;
+ unsigned offset;
+ size_t count;
+} skb_reader_t;
+
/*
- * TCP read fragment marker
+ * Copy from an skb into memory and shrink the skb.
*/
-static inline int
-tcp_read_fraghdr(struct rpc_xprt *xprt)
+static inline size_t
+tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
{
- struct iovec riov;
- int want, result;
-
- if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
- goto done;
-
- want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
- dprintk("RPC: reading header (%d bytes)\n", want);
- do {
- riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
- riov.iov_len = want;
- result = xprt_recvmsg(xprt, &riov, 1, want, 0);
- if (result < 0)
- return result;
- xprt->tcp_offset += result;
- want -= result;
- } while (want);
-
- /* Get the record length and mask out the last fragment bit */
- xprt->tcp_reclen = ntohl(xprt->tcp_recm);
- xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
- xprt->tcp_reclen &= 0x7fffffff;
-
- dprintk("RPC: New record reclen %d morefrags %d\n",
- xprt->tcp_reclen, xprt->tcp_more);
- done:
- return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+ if (len > desc->count)
+ len = desc->count;
+ skb_copy_bits(desc->skb, desc->offset, p, len);
+ desc->offset += len;
+ desc->count -= len;
+ return len;
}
/*
- * TCP read xid
+ * TCP read fragment marker
*/
-static inline int
-tcp_read_xid(struct rpc_xprt *xprt, int avail)
+static inline void
+tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct iovec riov;
- int want, result;
-
- if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
- goto done;
- want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
- do {
- dprintk("RPC: reading xid (%d bytes)\n", want);
- riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
- riov.iov_len = want;
- result = xprt_recvmsg(xprt, &riov, 1, want, 0);
- if (result < 0)
- return result;
- xprt->tcp_copied += result;
- xprt->tcp_offset += result;
- want -= result;
- avail -= result;
- } while (want);
- done:
- return avail;
+ size_t len, used;
+ char *p;
+
+ p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
+ len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+ used = tcp_copy_data(desc, p, len);
+ xprt->tcp_offset += used;
+ if (used != len)
+ return;
+ xprt->tcp_reclen = ntohl(xprt->tcp_recm);
+ if (xprt->tcp_reclen & 0x80000000)
+ xprt->tcp_flags |= XPRT_LAST_FRAG;
+ else
+ xprt->tcp_flags &= ~XPRT_LAST_FRAG;
+ xprt->tcp_reclen &= 0x7fffffff;
+ xprt->tcp_flags &= ~XPRT_COPY_RECM;
+ xprt->tcp_offset = 0;
+ /* Sanity check of the record length */
+ if (xprt->tcp_reclen < 4) {
+ printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
+ xprt_disconnect(xprt);
+ }
+ dprintk("RPC: reading TCP record fragment of length %d\n",
+ xprt->tcp_reclen);
}
-/*
- * TCP read and complete request
- */
-static inline int
-tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
+static void
+tcp_check_recm(struct rpc_xprt *xprt)
{
- int want, result;
-
- if (req->rq_rlen <= xprt->tcp_copied || !avail)
- goto done;
- want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
- do {
- dprintk("RPC: %4d TCP receiving %d bytes\n",
- req->rq_task->tk_pid, want);
-
- result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
- if (result < 0)
- return result;
- xprt->tcp_copied += result;
- xprt->tcp_offset += result;
- avail -= result;
- want -= result;
- } while (want);
-
- done:
- if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
- return avail;
- dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
- xprt_complete_rqst(xprt, req, xprt->tcp_copied);
-
- return avail;
+ if (xprt->tcp_offset == xprt->tcp_reclen) {
+ xprt->tcp_flags |= XPRT_COPY_RECM;
+ xprt->tcp_offset = 0;
+ if (xprt->tcp_flags & XPRT_LAST_FRAG) {
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ xprt->tcp_flags |= XPRT_COPY_XID;
+ xprt->tcp_copied = 0;
+ }
+ }
}
/*
- * TCP discard extra bytes from a short read
+ * TCP read xid
*/
-static inline int
-tcp_read_discard(struct rpc_xprt *xprt, int avail)
+static inline void
+tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct iovec riov;
- static u8 dummy[64];
- int want, result = 0;
-
- while (avail) {
- want = min_t(unsigned int, avail, sizeof(dummy));
- riov.iov_base = dummy;
- riov.iov_len = want;
- dprintk("RPC: TCP skipping %d bytes\n", want);
- result = xprt_recvmsg(xprt, &riov, 1, want, 0);
- if (result < 0)
- return result;
- xprt->tcp_offset += result;
- avail -= result;
- }
- return avail;
+ size_t len, used;
+ char *p;
+
+ len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
+ dprintk("RPC: reading XID (%Zu bytes)\n", len);
+ p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
+ used = tcp_copy_data(desc, p, len);
+ xprt->tcp_offset += used;
+ if (used != len)
+ return;
+ xprt->tcp_flags &= ~XPRT_COPY_XID;
+ xprt->tcp_flags |= XPRT_COPY_DATA;
+ xprt->tcp_copied = 4;
+ dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
+ tcp_check_recm(xprt);
}
/*
- * TCP record receive routine
- * This is not the most efficient code since we call recvfrom thrice--
- * first receiving the record marker, then the XID, then the data.
- *
- * The optimal solution would be a RPC support in the TCP layer, which
- * would gather all data up to the next record marker and then pass us
- * the list of all TCP segments ready to be copied.
+ * TCP read and complete request
*/
-static int
-tcp_input_record(struct rpc_xprt *xprt)
+static inline void
+tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct rpc_rqst *req = NULL;
- struct rpc_task *task = NULL;
- int avail, result;
-
- dprintk("RPC: tcp_input_record\n");
-
- if (xprt->shutdown)
- return -EIO;
- if (!xprt_connected(xprt))
- return -ENOTCONN;
-
- /* Read in a new fragment marker if necessary */
- /* Can we ever really expect to get completely empty fragments? */
- if ((result = tcp_read_fraghdr(xprt)) < 0)
- return result;
- avail = result;
-
- /* Read in the xid if necessary */
- if ((result = tcp_read_xid(xprt, avail)) < 0)
- return result;
- if (!(avail = result))
- goto out_ok;
+ struct rpc_rqst *req;
+ struct iovec *iov;
+ char *p;
+ unsigned long skip;
+ size_t len, used;
+ int n;
/* Find and lock the request corresponding to this xid */
req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
- if (req) {
- task = req->rq_task;
- /* Read in the request data */
- result = tcp_read_request(xprt, req, avail);
- rpc_unlock_task(task);
- if (result < 0)
- return result;
- avail = result;
+ if (!req) {
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ dprintk("RPC: XID %08x request not found!\n",
+ xprt->tcp_xid);
+ return;
}
-
- /* Skip over any trailing bytes on short reads */
- if ((result = tcp_read_discard(xprt, avail)) < 0)
- return result;
-
- out_ok:
- dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
- xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
- result = xprt->tcp_reclen;
- xprt->tcp_reclen = 0;
- xprt->tcp_offset = 0;
- if (!xprt->tcp_more)
- xprt->tcp_copied = 0;
- return result;
-}
-
-/*
- * TCP task queue stuff
- */
-LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */
-
-static inline
-void tcp_rpciod_queue(void)
-{
- rpciod_wake_up();
-}
-
-int xprt_tcp_pending(void)
-{
- int retval;
-
- spin_lock_bh(&rpc_queue_lock);
- retval = !list_empty(&rpc_xprt_pending);
- spin_unlock_bh(&rpc_queue_lock);
- return retval;
-}
-
-static inline
-void xprt_append_pending(struct rpc_xprt *xprt)
-{
- spin_lock_bh(&rpc_queue_lock);
- if (list_empty(&xprt->rx_pending)) {
- list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
- dprintk("RPC: xprt queue %p\n", xprt);
- tcp_rpciod_queue();
+ skip = xprt->tcp_copied;
+ iov = req->rq_rvec;
+ for (n = req->rq_rnr; n != 0; n--, iov++) {
+ if (skip >= iov->iov_len) {
+ skip -= iov->iov_len;
+ continue;
+ }
+ p = iov->iov_base;
+ len = iov->iov_len;
+ if (skip) {
+ p += skip;
+ len -= skip;
+ skip = 0;
+ }
+ if (xprt->tcp_offset + len > xprt->tcp_reclen)
+ len = xprt->tcp_reclen - xprt->tcp_offset;
+ used = tcp_copy_data(desc, p, len);
+ xprt->tcp_copied += used;
+ xprt->tcp_offset += used;
+ if (used != len)
+ break;
+ if (xprt->tcp_copied == req->rq_rlen) {
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ break;
+ }
+ if (xprt->tcp_offset == xprt->tcp_reclen) {
+ if (xprt->tcp_flags & XPRT_LAST_FRAG)
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ break;
+ }
}
- spin_unlock_bh(&rpc_queue_lock);
-}
-static
-void xprt_remove_pending(struct rpc_xprt *xprt)
-{
- spin_lock_bh(&rpc_queue_lock);
- if (!list_empty(&xprt->rx_pending)) {
- list_del(&xprt->rx_pending);
- INIT_LIST_HEAD(&xprt->rx_pending);
+ if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
+ dprintk("RPC: %4d received reply complete\n",
+ req->rq_task->tk_pid);
+ xprt_complete_rqst(xprt, req, xprt->tcp_copied);
}
- spin_unlock_bh(&rpc_queue_lock);
+ rpc_unlock_task(req->rq_task);
+ tcp_check_recm(xprt);
}
-static inline
-struct rpc_xprt *xprt_remove_pending_next(void)
+/*
+ * TCP discard extra bytes from a short read
+ */
+static inline void
+tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct rpc_xprt *xprt = NULL;
-
- spin_lock_bh(&rpc_queue_lock);
- if (!list_empty(&rpc_xprt_pending)) {
- xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
- list_del(&xprt->rx_pending);
- INIT_LIST_HEAD(&xprt->rx_pending);
- }
- spin_unlock_bh(&rpc_queue_lock);
- return xprt;
+ size_t len;
+
+ len = xprt->tcp_reclen - xprt->tcp_offset;
+ if (len > desc->count)
+ len = desc->count;
+ desc->count -= len;
+ desc->offset += len;
+ xprt->tcp_offset += len;
+ tcp_check_recm(xprt);
}
/*
- * This is protected from tcp_data_ready and the stack as its run
- * inside of the RPC I/O daemon
+ * TCP record receive routine
+ * We first have to grab the record marker, then the XID, then the data.
*/
-void
-__rpciod_tcp_dispatcher(void)
+static int
+tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
+ unsigned int offset, size_t len)
{
- struct rpc_xprt *xprt;
- int safe_retry = 0, result;
-
- dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-
- /*
- * Empty each pending socket
- */
- while ((xprt = xprt_remove_pending_next()) != NULL) {
- dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-
- do {
- result = tcp_input_record(xprt);
- } while (result >= 0);
+ struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
+ skb_reader_t desc = { skb, offset, len };
- if (safe_retry++ > 200) {
- schedule();
- safe_retry = 0;
+ dprintk("RPC: tcp_data_recv\n");
+ do {
+ /* Read in a new fragment marker if necessary */
+ /* Can we ever really expect to get completely empty fragments? */
+ if (xprt->tcp_flags & XPRT_COPY_RECM) {
+ tcp_read_fraghdr(xprt, &desc);
+ continue;
}
- }
+ /* Read in the xid if necessary */
+ if (xprt->tcp_flags & XPRT_COPY_XID) {
+ tcp_read_xid(xprt, &desc);
+ continue;
+ }
+ /* Read in the request data */
+ if (xprt->tcp_flags & XPRT_COPY_DATA) {
+ tcp_read_request(xprt, &desc);
+ continue;
+ }
+ /* Skip over any trailing bytes on short reads */
+ tcp_read_discard(xprt, &desc);
+ } while (desc.count && xprt_connected(xprt));
+ dprintk("RPC: tcp_data_recv done\n");
+ return len - desc.count;
}
-/*
- * data_ready callback for TCP. We can't just jump into the
- * tcp recvmsg functions inside of the network receive bh or
- * bad things occur. We queue it to pick up after networking
- * is done.
- */
-
-static void tcp_data_ready(struct sock *sk, int len)
+static void tcp_data_ready(struct sock *sk, int bytes)
{
- struct rpc_xprt *xprt;
+ struct rpc_xprt *xprt;
+ read_descriptor_t rd_desc;
dprintk("RPC: tcp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
- {
- printk("Not a socket with xprt %p\n", sk);
- goto out;
+ if (!(xprt = xprt_from_sock(sk))) {
+ printk("RPC: tcp_data_ready socket info not found!\n");
+ return;
}
-
if (xprt->shutdown)
- goto out;
-
- xprt_append_pending(xprt);
+ return;
- dprintk("RPC: tcp_data_ready client %p\n", xprt);
- dprintk("RPC: state %x conn %d dead %d zapped %d\n",
- sk->state, xprt_connected(xprt),
- sk->dead, sk->zapped);
- out:
- if (sk->sleep && waitqueue_active(sk->sleep))
- wake_up_interruptible(sk->sleep);
+ /* We use rd_desc to pass struct xprt to tcp_data_recv */
+ rd_desc.buf = (char *)xprt;
+ rd_desc.count = 65536;
+ tcp_read_sock(sk, &rd_desc, tcp_data_recv);
}
-
static void
tcp_state_change(struct sock *sk)
{
req->rq_next = NULL;
xprt->free = xprt->slot;
- INIT_LIST_HEAD(&xprt->rx_pending);
-
dprintk("RPC: created transport %p\n", xprt);
xprt_bind_socket(xprt, sock);