]> git.hungrycats.org Git - linux/commitdiff
[PATCH] Do RPC over TCP reply message delivery in sock->data_ready()
authorTrond Myklebust <trond.myklebust@fys.uio.no>
Sat, 4 May 2002 04:28:41 +0000 (21:28 -0700)
committerLinus Torvalds <torvalds@home.transmeta.com>
Sat, 4 May 2002 04:28:41 +0000 (21:28 -0700)
xprt.c:
  Speed up synchronous RPC over TCP calls by having the
  replies delivered by the IPV4 "bottom half", instead of
  switching to the rpciod process in order to call recvmsg().
   - Remove sock_recvmsg() interface.
   - Remove rpc_xprt_pending list and rpciod_tcp_dispatcher() interface.
   - Use the new tcp_read_sock() interface to deliver data directly
     from within tcp_data_ready().
sched.c:
   - Remove references to rpciod_tcp_dispatcher.
xprt.h:
   - New set of flags to reflect the TCP record read state.

Cheers,
  Trond

include/linux/sunrpc/xprt.h
net/sunrpc/sched.c
net/sunrpc/xprt.c

index 2b18426a1de98c44562e11c1a1a83d57237032a2..f1a329b62b43f342ce13608fb208e3a7bbd9295a 100644 (file)
@@ -120,6 +120,11 @@ struct rpc_rqst {
 #define rq_rnr                 rq_rcv_buf.io_nr
 #define rq_rlen                        rq_rcv_buf.io_len
 
+#define XPRT_LAST_FRAG         (1 << 0)
+#define XPRT_COPY_RECM         (1 << 1)
+#define XPRT_COPY_XID          (1 << 2)
+#define XPRT_COPY_DATA         (1 << 3)
+
 struct rpc_xprt {
        struct socket *         sock;           /* BSD socket layer */
        struct sock *           inet;           /* INET layer */
@@ -140,18 +145,17 @@ struct rpc_xprt {
        unsigned long           sockstate;      /* Socket state */
        unsigned char           shutdown   : 1, /* being shut down */
                                nocong     : 1, /* no congestion control */
-                               stream     : 1, /* TCP */
-                               tcp_more   : 1; /* more record fragments */
+                               stream     : 1; /* TCP */
 
        /*
         * State of TCP reply receive stuff
         */
-       u32                     tcp_recm;       /* Fragment header */
-       u32                     tcp_xid;        /* Current XID */
-       unsigned int            tcp_reclen,     /* fragment length */
-                               tcp_offset,     /* fragment offset */
-                               tcp_copied;     /* copied to request */
-       struct list_head        rx_pending;     /* receive pending list */
+       u32                     tcp_recm,       /* Fragment header */
+                               tcp_xid,        /* Current XID */
+                               tcp_reclen,     /* fragment length */
+                               tcp_offset;     /* fragment offset */
+       unsigned long           tcp_copied,     /* copied to request */
+                               tcp_flags;
 
        /*
         * Send stuff
@@ -185,8 +189,6 @@ int                 xprt_adjust_timeout(struct rpc_timeout *);
 void                   xprt_release(struct rpc_task *);
 void                   xprt_reconnect(struct rpc_task *);
 int                    xprt_clear_backlog(struct rpc_xprt *);
-int                    xprt_tcp_pending(void);
-void                   __rpciod_tcp_dispatcher(void);
 
 #define XPRT_WSPACE    0
 #define XPRT_CONNECT   1
@@ -200,13 +202,6 @@ void                       __rpciod_tcp_dispatcher(void);
 #define xprt_test_and_set_connected(xp)        (test_and_set_bit(XPRT_CONNECT, &(xp)->sockstate))
 #define xprt_clear_connected(xp)       (clear_bit(XPRT_CONNECT, &(xp)->sockstate))
 
-static inline
-void rpciod_tcp_dispatcher(void)
-{
-       if (xprt_tcp_pending())
-               __rpciod_tcp_dispatcher();
-}
-
 #endif /* __KERNEL__*/
 
 #endif /* _LINUX_SUNRPC_XPRT_H */
index c75a5cd47e82afad103116332ed0699aae64ca93..2a3b2000c8b87a820ade2ab9f56a9c8814c68f51 100644 (file)
@@ -704,9 +704,6 @@ __rpc_schedule(void)
 
        dprintk("RPC:      rpc_schedule enter\n");
        while (1) {
-               /* Ensure equal rights for tcp tasks... */
-               rpciod_tcp_dispatcher();
-
                spin_lock_bh(&rpc_queue_lock);
 
                task_for_first(task, &schedq.tasks) {
@@ -1030,7 +1027,7 @@ static DECLARE_MUTEX_LOCKED(rpciod_running);
 static inline int
 rpciod_task_pending(void)
 {
-       return !list_empty(&schedq.tasks) || xprt_tcp_pending();
+       return !list_empty(&schedq.tasks);
 }
 
 
index 8f3f3fe5fbb5751185abcc782df0f59d294658c0..7c58a3ed7c4a52d27adbd9a7cb56cbb453a9c736 100644 (file)
@@ -64,6 +64,7 @@
 #include <net/sock.h>
 #include <net/checksum.h>
 #include <net/udp.h>
+#include <net/tcp.h>
 
 #include <asm/uaccess.h>
 
@@ -88,7 +89,6 @@ static void   xprt_disconnect(struct rpc_xprt *);
 static void    xprt_reconn_status(struct rpc_task *task);
 static struct socket *xprt_create_socket(int, struct rpc_timeout *);
 static int     xprt_bind_socket(struct rpc_xprt *, struct socket *);
-static void    xprt_remove_pending(struct rpc_xprt *);
 
 #ifdef RPC_DEBUG_DATA
 /*
@@ -269,43 +269,6 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
        return result;
 }
 
-/*
- * Read data from socket
- */
-static int
-xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
-{
-       struct socket   *sock = xprt->sock;
-       struct msghdr   msg;
-       mm_segment_t    oldfs;
-       struct iovec    niv[MAX_IOVEC];
-       int             result;
-
-       if (!sock)
-               return -ENOTCONN;
-
-       msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
-       msg.msg_iov     = iov;
-       msg.msg_iovlen  = nr;
-       msg.msg_name    = NULL;
-       msg.msg_namelen = 0;
-       msg.msg_control = NULL;
-       msg.msg_controllen = 0;
-
-       /* Adjust the iovec if we've already filled it */
-       if (shift)
-               xprt_move_iov(&msg, niv, shift);
-
-       oldfs = get_fs(); set_fs(get_ds());
-       result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
-       set_fs(oldfs);
-
-       dprintk("RPC:      xprt_recvmsg(iov %p, len %d) = %d\n",
-                                               iov, len, result);
-       return result;
-}
-
-
 /*
  * Adjust RPC congestion window
  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
@@ -423,7 +386,6 @@ xprt_disconnect(struct rpc_xprt *xprt)
 {
        dprintk("RPC:      disconnected transport %p\n", xprt);
        xprt_clear_connected(xprt);
-       xprt_remove_pending(xprt);
        rpc_wake_up_status(&xprt->pending, -ENOTCONN);
 }
 
@@ -473,7 +435,7 @@ xprt_reconnect(struct rpc_task *task)
        xprt->tcp_offset = 0;
        xprt->tcp_reclen = 0;
        xprt->tcp_copied = 0;
-       xprt->tcp_more = 0;
+       xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
 
        /* Now connect it asynchronously. */
        dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
@@ -716,309 +678,229 @@ udp_data_ready(struct sock *sk, int len)
                wake_up_interruptible(sk->sleep);
 }
 
+typedef struct {
+       struct sk_buff *skb;
+       unsigned offset;
+       size_t count;
+} skb_reader_t;
+
 /*
- * TCP read fragment marker
+ * Copy from an skb into memory and shrink the skb.
  */
-static inline int
-tcp_read_fraghdr(struct rpc_xprt *xprt)
+static inline size_t
+tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
 {
-       struct iovec    riov;
-       int             want, result;
-
-       if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
-               goto done;
-
-       want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
-       dprintk("RPC:      reading header (%d bytes)\n", want);
-       do {
-               riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
-               riov.iov_len  = want;
-               result = xprt_recvmsg(xprt, &riov, 1, want, 0);
-               if (result < 0)
-                       return result;
-               xprt->tcp_offset += result;
-               want -= result;
-       } while (want);
-
-       /* Get the record length and mask out the last fragment bit */
-       xprt->tcp_reclen = ntohl(xprt->tcp_recm);
-       xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
-       xprt->tcp_reclen &= 0x7fffffff;
-
-       dprintk("RPC:      New record reclen %d morefrags %d\n",
-                                  xprt->tcp_reclen, xprt->tcp_more);
- done:
-       return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+       if (len > desc->count)
+               len = desc->count;
+       skb_copy_bits(desc->skb, desc->offset, p, len);
+       desc->offset += len;
+       desc->count -= len;
+       return len;
 }
 
 /*
- * TCP read xid
+ * TCP read fragment marker
  */
-static inline int
-tcp_read_xid(struct rpc_xprt *xprt, int avail)
+static inline void
+tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
-       struct iovec    riov;
-       int             want, result;
-
-       if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
-               goto done;
-       want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
-       do {
-               dprintk("RPC:      reading xid (%d bytes)\n", want);
-               riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
-               riov.iov_len  = want;
-               result = xprt_recvmsg(xprt, &riov, 1, want, 0);
-               if (result < 0)
-                       return result;
-               xprt->tcp_copied += result;
-               xprt->tcp_offset += result;
-               want  -= result;
-               avail -= result;
-       } while (want);
- done:
-       return avail;
+       size_t len, used;
+       char *p;
+
+       p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
+       len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+       used = tcp_copy_data(desc, p, len);
+       xprt->tcp_offset += used;
+       if (used != len)
+               return;
+       xprt->tcp_reclen = ntohl(xprt->tcp_recm);
+       if (xprt->tcp_reclen & 0x80000000)
+               xprt->tcp_flags |= XPRT_LAST_FRAG;
+       else
+               xprt->tcp_flags &= ~XPRT_LAST_FRAG;
+       xprt->tcp_reclen &= 0x7fffffff;
+       xprt->tcp_flags &= ~XPRT_COPY_RECM;
+       xprt->tcp_offset = 0;
+       /* Sanity check of the record length */
+       if (xprt->tcp_reclen < 4) {
+               printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
+               xprt_disconnect(xprt);
+       }
+       dprintk("RPC:      reading TCP record fragment of length %d\n",
+                       xprt->tcp_reclen);
 }
 
-/*
- * TCP read and complete request
- */
-static inline int
-tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
+static void
+tcp_check_recm(struct rpc_xprt *xprt)
 {
-       int     want, result;
-
-       if (req->rq_rlen <= xprt->tcp_copied || !avail)
-               goto done;
-       want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
-       do {
-               dprintk("RPC: %4d TCP receiving %d bytes\n",
-                       req->rq_task->tk_pid, want);
-
-               result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
-               if (result < 0)
-                       return result;
-               xprt->tcp_copied += result;
-               xprt->tcp_offset += result;
-               avail  -= result;
-               want   -= result;
-       } while (want);
-
- done:
-       if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
-               return avail;
-       dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
-       xprt_complete_rqst(xprt, req, xprt->tcp_copied);
-
-       return avail;
+       if (xprt->tcp_offset == xprt->tcp_reclen) {
+               xprt->tcp_flags |= XPRT_COPY_RECM;
+               xprt->tcp_offset = 0;
+               if (xprt->tcp_flags & XPRT_LAST_FRAG) {
+                       xprt->tcp_flags &= ~XPRT_COPY_DATA;
+                       xprt->tcp_flags |= XPRT_COPY_XID;
+                       xprt->tcp_copied = 0;
+               }
+       }
 }
 
 /*
- * TCP discard extra bytes from a short read
+ * TCP read xid
  */
-static inline int
-tcp_read_discard(struct rpc_xprt *xprt, int avail)
+static inline void
+tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
-       struct iovec    riov;
-       static u8       dummy[64];
-       int             want, result = 0;
-
-       while (avail) {
-               want = min_t(unsigned int, avail, sizeof(dummy));
-               riov.iov_base = dummy;
-               riov.iov_len  = want;
-               dprintk("RPC:      TCP skipping %d bytes\n", want);
-               result = xprt_recvmsg(xprt, &riov, 1, want, 0);
-               if (result < 0)
-                       return result;
-               xprt->tcp_offset += result;
-               avail  -= result;
-       }
-       return avail;
+       size_t len, used;
+       char *p;
+
+       len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
+       dprintk("RPC:      reading XID (%Zu bytes)\n", len);
+       p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
+       used = tcp_copy_data(desc, p, len);
+       xprt->tcp_offset += used;
+       if (used != len)
+               return;
+       xprt->tcp_flags &= ~XPRT_COPY_XID;
+       xprt->tcp_flags |= XPRT_COPY_DATA;
+       xprt->tcp_copied = 4;
+       dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);
+       tcp_check_recm(xprt);
 }
 
 /*
- * TCP record receive routine
- * This is not the most efficient code since we call recvfrom thrice--
- * first receiving the record marker, then the XID, then the data.
- * 
- * The optimal solution would be a RPC support in the TCP layer, which
- * would gather all data up to the next record marker and then pass us
- * the list of all TCP segments ready to be copied.
+ * TCP read and complete request
  */
-static int
-tcp_input_record(struct rpc_xprt *xprt)
+static inline void
+tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
-       struct rpc_rqst *req = NULL;
-       struct rpc_task *task = NULL;
-       int             avail, result;
-
-       dprintk("RPC:      tcp_input_record\n");
-
-       if (xprt->shutdown)
-               return -EIO;
-       if (!xprt_connected(xprt))
-               return -ENOTCONN;
-
-       /* Read in a new fragment marker if necessary */
-       /* Can we ever really expect to get completely empty fragments? */
-       if ((result = tcp_read_fraghdr(xprt)) < 0)
-               return result;
-       avail = result;
-
-       /* Read in the xid if necessary */
-       if ((result = tcp_read_xid(xprt, avail)) < 0)
-               return result;
-       if (!(avail = result))
-               goto out_ok;
+       struct rpc_rqst *req;
+       struct iovec *iov;
+       char *p;
+       unsigned long skip;
+       size_t len, used;
+       int n;
 
        /* Find and lock the request corresponding to this xid */
        req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
-       if (req) {
-               task = req->rq_task;
-               /* Read in the request data */
-               result = tcp_read_request(xprt,  req, avail);
-               rpc_unlock_task(task);
-               if (result < 0)
-                       return result;
-               avail = result;
+       if (!req) {
+               xprt->tcp_flags &= ~XPRT_COPY_DATA;
+               dprintk("RPC:      XID %08x request not found!\n",
+                               xprt->tcp_xid);
+               return;
        }
-
-       /* Skip over any trailing bytes on short reads */
-       if ((result = tcp_read_discard(xprt, avail)) < 0)
-               return result;
-
- out_ok:
-       dprintk("RPC:      tcp_input_record done (off %d reclen %d copied %d)\n",
-                       xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
-       result = xprt->tcp_reclen;
-       xprt->tcp_reclen = 0;
-       xprt->tcp_offset = 0;
-       if (!xprt->tcp_more)
-               xprt->tcp_copied = 0;
-       return result;
-}
-
-/*
- *     TCP task queue stuff
- */
-LIST_HEAD(rpc_xprt_pending);   /* List of xprts having pending tcp requests */
-
-static inline
-void tcp_rpciod_queue(void)
-{
-       rpciod_wake_up();
-}
-
-int xprt_tcp_pending(void)
-{
-       int retval;
-
-       spin_lock_bh(&rpc_queue_lock);
-       retval = !list_empty(&rpc_xprt_pending);
-       spin_unlock_bh(&rpc_queue_lock);
-       return retval;
-}
-
-static inline
-void xprt_append_pending(struct rpc_xprt *xprt)
-{
-       spin_lock_bh(&rpc_queue_lock);
-       if (list_empty(&xprt->rx_pending)) {
-               list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
-               dprintk("RPC:     xprt queue %p\n", xprt);
-               tcp_rpciod_queue();
+       skip = xprt->tcp_copied;
+       iov = req->rq_rvec;
+       for (n = req->rq_rnr; n != 0; n--, iov++) {
+               if (skip >= iov->iov_len) {
+                       skip -= iov->iov_len;
+                       continue;
+               }
+               p = iov->iov_base;
+               len = iov->iov_len;
+               if (skip) {
+                       p += skip;
+                       len -= skip;
+                       skip = 0;
+               }
+               if (xprt->tcp_offset + len > xprt->tcp_reclen)
+                       len = xprt->tcp_reclen - xprt->tcp_offset;
+               used = tcp_copy_data(desc, p, len);
+               xprt->tcp_copied += used;
+               xprt->tcp_offset += used;
+               if (used != len)
+                       break;
+               if (xprt->tcp_copied == req->rq_rlen) {
+                       xprt->tcp_flags &= ~XPRT_COPY_DATA;
+                       break;
+               }
+               if (xprt->tcp_offset == xprt->tcp_reclen) {
+                       if (xprt->tcp_flags & XPRT_LAST_FRAG)
+                               xprt->tcp_flags &= ~XPRT_COPY_DATA;
+                       break;
+               }
        }
-       spin_unlock_bh(&rpc_queue_lock);
-}
 
-static
-void xprt_remove_pending(struct rpc_xprt *xprt)
-{
-       spin_lock_bh(&rpc_queue_lock);
-       if (!list_empty(&xprt->rx_pending)) {
-               list_del(&xprt->rx_pending);
-               INIT_LIST_HEAD(&xprt->rx_pending);
+       if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
+               dprintk("RPC: %4d received reply complete\n",
+                               req->rq_task->tk_pid);
+               xprt_complete_rqst(xprt, req, xprt->tcp_copied);
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       rpc_unlock_task(req->rq_task); 
+       tcp_check_recm(xprt);
 }
 
-static inline
-struct rpc_xprt *xprt_remove_pending_next(void)
+/*
+ * TCP discard extra bytes from a short read
+ */
+static inline void
+tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
-       struct rpc_xprt *xprt = NULL;
-
-       spin_lock_bh(&rpc_queue_lock);
-       if (!list_empty(&rpc_xprt_pending)) {
-               xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
-               list_del(&xprt->rx_pending);
-               INIT_LIST_HEAD(&xprt->rx_pending);
-       }
-       spin_unlock_bh(&rpc_queue_lock);
-       return xprt;
+       size_t len;
+
+       len = xprt->tcp_reclen - xprt->tcp_offset;
+       if (len > desc->count)
+               len = desc->count;
+       desc->count -= len;
+       desc->offset += len;
+       xprt->tcp_offset += len;
+       tcp_check_recm(xprt);
 }
 
 /*
- *     This is protected from tcp_data_ready and the stack as its run
- *     inside of the RPC I/O daemon
+ * TCP record receive routine
+ * We first have to grab the record marker, then the XID, then the data.
  */
-void
-__rpciod_tcp_dispatcher(void)
+static int
+tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
+               unsigned int offset, size_t len)
 {
-       struct rpc_xprt *xprt;
-       int safe_retry = 0, result;
-
-       dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-
-       /*
-        *      Empty each pending socket
-        */
-       while ((xprt = xprt_remove_pending_next()) != NULL) {
-               dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-
-               do {
-                       result = tcp_input_record(xprt);
-               } while (result >= 0);
+       struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
+       skb_reader_t desc = { skb, offset, len };
 
-               if (safe_retry++ > 200) {
-                       schedule();
-                       safe_retry = 0;
+       dprintk("RPC:      tcp_data_recv\n");
+       do {
+               /* Read in a new fragment marker if necessary */
+               /* Can we ever really expect to get completely empty fragments? */
+               if (xprt->tcp_flags & XPRT_COPY_RECM) {
+                       tcp_read_fraghdr(xprt, &desc);
+                       continue;
                }
-       }
+               /* Read in the xid if necessary */
+               if (xprt->tcp_flags & XPRT_COPY_XID) {
+                       tcp_read_xid(xprt, &desc);
+                       continue;
+               }
+               /* Read in the request data */
+               if (xprt->tcp_flags & XPRT_COPY_DATA) {
+                       tcp_read_request(xprt, &desc);
+                       continue;
+               }
+               /* Skip over any trailing bytes on short reads */
+               tcp_read_discard(xprt, &desc);
+       } while (desc.count && xprt_connected(xprt));
+       dprintk("RPC:      tcp_data_recv done\n");
+       return len - desc.count;
 }
 
-/*
- *     data_ready callback for TCP. We can't just jump into the
- *     tcp recvmsg functions inside of the network receive bh or
- *     bad things occur. We queue it to pick up after networking
- *     is done.
- */
-static void tcp_data_ready(struct sock *sk, int len)
+static void tcp_data_ready(struct sock *sk, int bytes)
 {
-       struct rpc_xprt *xprt;
+       struct rpc_xprt *xprt;
+       read_descriptor_t rd_desc;
 
        dprintk("RPC:      tcp_data_ready...\n");
-       if (!(xprt = xprt_from_sock(sk)))
-       {
-               printk("Not a socket with xprt %p\n", sk);
-               goto out;
+       if (!(xprt = xprt_from_sock(sk))) {
+               printk("RPC:      tcp_data_ready socket info not found!\n");
+               return;
        }
-
        if (xprt->shutdown)
-               goto out;
-
-       xprt_append_pending(xprt);
+               return;
 
-       dprintk("RPC:      tcp_data_ready client %p\n", xprt);
-       dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
-                               sk->state, xprt_connected(xprt),
-                               sk->dead, sk->zapped);
- out:
-       if (sk->sleep && waitqueue_active(sk->sleep))
-               wake_up_interruptible(sk->sleep);
+       /* We use rd_desc to pass struct xprt to tcp_data_recv */
+       rd_desc.buf = (char *)xprt;
+       rd_desc.count = 65536;
+       tcp_read_sock(sk, &rd_desc, tcp_data_recv);
 }
 
-
 static void
 tcp_state_change(struct sock *sk)
 {
@@ -1483,8 +1365,6 @@ xprt_setup(struct socket *sock, int proto,
        req->rq_next = NULL;
        xprt->free = xprt->slot;
 
-       INIT_LIST_HEAD(&xprt->rx_pending);
-
        dprintk("RPC:      created transport %p\n", xprt);
        
        xprt_bind_socket(xprt, sock);