]> git.hungrycats.org Git - linux/commitdiff
[PATCH] PATCH 10/16: NFSD: TCP: rationalise locking in RPC server routines
authorNeil Brown <neilb@cse.unsw.edu.au>
Tue, 26 Feb 2002 06:23:32 +0000 (22:23 -0800)
committerLinus Torvalds <torvalds@penguin.transmeta.com>
Tue, 26 Feb 2002 06:23:32 +0000 (22:23 -0800)
Tidy up SMP locking for svc_sock

sk_lock is not necessary and is now removed.
The only things that were happening under sk_lock but
not the more global sv_lock were testing and setting
some of the flags: sk_busy, sk_conn, sk_data etc.

These have been changed to bits in a flags word which are atomically
set and tested.

Also, by establishing some simple rules about that must
be done after setting these flags, the locking is not needed.

With this patch sk_conn and sk_data are now flags, not counts (sk_data
was already a flag for udp).  They are set if there might be
a connection or data, and only clear when we are sure there aren't
(or when we are about to check if there is).

svc_sock_accepted becomes identical to svc_sock_recieved and
so is discarded in favour of the latter.

sk_rqstp was never used and is now gone.

include/linux/sunrpc/svcsock.h
net/sunrpc/svcsock.c

index 0c565502e2723366add08a7fa8030b33b403dd71..e4bdc96ab6d8b88a5e3a30313ceaeafd56e9a0f1 100644 (file)
@@ -19,17 +19,18 @@ struct svc_sock {
        struct list_head        sk_list;        /* list of all sockets */
        struct socket *         sk_sock;        /* berkeley socket layer */
        struct sock *           sk_sk;          /* INET layer */
-       spinlock_t              sk_lock;
 
        struct svc_serv *       sk_server;      /* service for this socket */
        unsigned char           sk_inuse;       /* use count */
-       unsigned char           sk_busy;        /* enqueued/receiving */
-       unsigned char           sk_conn;        /* conn pending */
-       unsigned char           sk_close;       /* dead or dying */
-       int                     sk_data;        /* data pending */
-       unsigned int            sk_temp : 1,    /* temp socket */
-                               sk_qued : 1,    /* on serv->sk_sockets */
-                               sk_dead : 1;    /* socket closed */
+       unsigned int            sk_flags;
+#define        SK_BUSY         0                       /* enqueued/receiving */
+#define        SK_CONN         1                       /* conn pending */
+#define        SK_CLOSE        2                       /* dead or dying */
+#define        SK_DATA         3                       /* data pending */
+#define        SK_TEMP         4                       /* temp (TCP) socket */
+#define        SK_QUED         5                       /* on serv->sk_sockets */
+#define        SK_DEAD         6                       /* socket closed */
+
        int                     (*sk_recvfrom)(struct svc_rqst *rqstp);
        int                     (*sk_sendto)(struct svc_rqst *rqstp);
 
@@ -40,9 +41,6 @@ struct svc_sock {
        /* private TCP part */
        int                     sk_reclen;      /* length of record */
        int                     sk_tcplen;      /* current read length */
-
-       /* Debugging */
-       struct svc_rqst *       sk_rqstp;
 };
 
 /*
index 646a8c982f0f08be6718758683962508cbe1fd02..288819609ca710deb59c2c366006794187203eb6 100644 (file)
 
 /* SMP locking strategy:
  *
- *     svc_sock->sk_lock and svc_serv->sv_lock protect their
- *     respective structures.
+ *     svc_serv->sv_lock protects most stuff for that service.
+ *
+ *     Some flags can be set to certain values at any time
+ *     providing that certain rules are followed:
+ *
+ *     SK_BUSY  can be set to 0 at any time.  
+ *             svc_sock_enqueue must be called afterwards
+ *     SK_CONN, SK_DATA, can be set or cleared at any time.
+ *             after a set, svc_sock_enqueue must be called.   
+ *             after a clear, the socket must be read/accepted
+ *              if this succeeds, it must be set again.
+ *     SK_CLOSE can set at any time. It is never cleared.
  *
- *     Antideadlock ordering is sk_lock --> sv_lock.
  */
 
 #define RPCDBG_FACILITY        RPCDBG_SVCSOCK
@@ -102,7 +111,6 @@ svc_release_skb(struct svc_rqst *rqstp)
  * Queue up a socket with data pending. If there are idle nfsd
  * processes, wake 'em up.
  *
- * This must be called with svsk->sk_lock held.
  */
 static void
 svc_sock_enqueue(struct svc_sock *svsk)
@@ -110,15 +118,18 @@ svc_sock_enqueue(struct svc_sock *svsk)
        struct svc_serv *serv = svsk->sk_server;
        struct svc_rqst *rqstp;
 
-       /* NOTE: Local BH is already disabled by our caller. */
-       spin_lock(&serv->sv_lock);
+       if (!(svsk->sk_flags &
+             ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)) ))
+               return;
+
+       spin_lock_bh(&serv->sv_lock);
 
        if (!list_empty(&serv->sv_threads) && 
            !list_empty(&serv->sv_sockets))
                printk(KERN_ERR
                        "svc_sock_enqueue: threads and sockets both waiting??\n");
 
-       if (svsk->sk_busy) {
+       if (test_bit(SK_BUSY, &svsk->sk_flags)) {
                /* Don't enqueue socket while daemon is receiving */
                dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
                goto out_unlock;
@@ -128,7 +139,7 @@ svc_sock_enqueue(struct svc_sock *svsk)
         * server has processed all pending data and put the socket back
         * on the idle list.
         */
-       svsk->sk_busy = 1;
+       set_bit(SK_BUSY, &svsk->sk_flags);
 
        if (!list_empty(&serv->sv_threads)) {
                rqstp = list_entry(serv->sv_threads.next,
@@ -147,11 +158,11 @@ svc_sock_enqueue(struct svc_sock *svsk)
        } else {
                dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
                list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
-               svsk->sk_qued = 1;
+               set_bit(SK_QUED, &svsk->sk_flags);
        }
 
 out_unlock:
-       spin_unlock(&serv->sv_lock);
+       spin_unlock_bh(&serv->sv_lock);
 }
 
 /*
@@ -171,49 +182,24 @@ svc_sock_dequeue(struct svc_serv *serv)
 
        dprintk("svc: socket %p dequeued, inuse=%d\n",
                svsk->sk_sk, svsk->sk_inuse);
-       svsk->sk_qued = 0;
+       clear_bit(SK_QUED, &svsk->sk_flags);
 
        return svsk;
 }
 
 /*
- * Having read count bytes from a socket, check whether it
+ * Having read something from a socket, check whether it
  * needs to be re-enqueued.
+ * Note: SK_DATA only gets cleared when a read-attempt finds
+ * no (or insufficient) data.
  */
 static inline void
-svc_sock_received(struct svc_sock *svsk, int count)
+svc_sock_received(struct svc_sock *svsk)
 {
-       spin_lock_bh(&svsk->sk_lock);
-       if ((svsk->sk_data -= count) < 0) {
-               printk(KERN_NOTICE "svc: sk_data negative!\n");
-               svsk->sk_data = 0;
-       }
-       svsk->sk_rqstp = NULL; /* XXX */
-       svsk->sk_busy = 0;
-       if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
-               dprintk("svc: socket %p re-enqueued after receive\n",
-                                               svsk->sk_sk);
-               svc_sock_enqueue(svsk);
-       }
-       spin_unlock_bh(&svsk->sk_lock);
+       clear_bit(SK_BUSY, &svsk->sk_flags);
+       svc_sock_enqueue(svsk);
 }
 
-/*
- * Dequeue a new connection.
- */
-static inline void
-svc_sock_accepted(struct svc_sock *svsk)
-{
-       spin_lock_bh(&svsk->sk_lock);
-        svsk->sk_busy = 0;
-        svsk->sk_conn--;
-        if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
-                dprintk("svc: socket %p re-enqueued after accept\n",
-                                               svsk->sk_sk);
-                svc_sock_enqueue(svsk);
-        }
-       spin_unlock_bh(&svsk->sk_lock);
-}
 
 /*
  * Release a socket after use.
@@ -228,7 +214,7 @@ svc_sock_release(struct svc_rqst *rqstp)
        rqstp->rq_sock = NULL;
 
        spin_lock_bh(&serv->sv_lock);
-       if (!--(svsk->sk_inuse) && svsk->sk_dead) {
+       if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) {
                spin_unlock_bh(&serv->sv_lock);
                dprintk("svc: releasing dead socket\n");
                sock_release(svsk->sk_sock);
@@ -363,11 +349,9 @@ svc_udp_data_ready(struct sock *sk, int count)
        if (!svsk)
                goto out;
        dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
-               svsk, sk, count, svsk->sk_busy);
-       spin_lock_bh(&svsk->sk_lock);
-       svsk->sk_data = 1;
+               svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
+       set_bit(SK_DATA, &svsk->sk_flags);
        svc_sock_enqueue(svsk);
-       spin_unlock_bh(&svsk->sk_lock);
  out:
        if (sk->sleep && waitqueue_active(sk->sleep))
                wake_up_interruptible(sk->sleep);
@@ -385,20 +369,21 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
        u32             *data;
        int             err, len;
 
-       svsk->sk_data = 0;
+       clear_bit(SK_DATA, &svsk->sk_flags);
        while ((skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err)) == NULL) {
-               svc_sock_received(svsk, 0);
+               svc_sock_received(svsk);
                if (err == -EAGAIN)
                        return err;
                /* possibly an icmp error */
                dprintk("svc: recvfrom returned error %d\n", -err);
        }
+       set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
 
        /* Sorry. */
        if (skb_is_nonlinear(skb)) {
                if (skb_linearize(skb, GFP_KERNEL) != 0) {
                        kfree_skb(skb);
-                       svc_sock_received(svsk, 0);
+                       svc_sock_received(svsk);
                        return 0;
                }
        }
@@ -406,13 +391,11 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
        if (skb->ip_summed != CHECKSUM_UNNECESSARY) {
                if ((unsigned short)csum_fold(skb_checksum(skb, 0, skb->len, skb->csum))) {
                        skb_free_datagram(svsk->sk_sk, skb);
-                       svc_sock_received(svsk, 0);
+                       svc_sock_received(svsk);
                        return 0;
                }
        }
 
-       /* There may be more data */
-       svsk->sk_data = 1;
 
        len  = skb->len - sizeof(struct udphdr);
        data = (u32 *) (skb->data + sizeof(struct udphdr));
@@ -434,7 +417,7 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
 
        /* One down, maybe more to go... */
        svsk->sk_sk->stamp = skb->stamp;
-       svc_sock_received(svsk, 0);
+       svc_sock_received(svsk);
 
        return len;
 }
@@ -494,10 +477,8 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
                printk("svc: socket %p: no user data\n", sk);
                goto out;
        }
-       spin_lock_bh(&svsk->sk_lock);
-       svsk->sk_conn++;
+       set_bit(SK_CONN, &svsk->sk_flags);
        svc_sock_enqueue(svsk);
-       spin_unlock_bh(&svsk->sk_lock);
  out:
        if (sk->sleep && waitqueue_active(sk->sleep))
                wake_up_interruptible_all(sk->sleep);
@@ -518,10 +499,8 @@ svc_tcp_state_change(struct sock *sk)
                printk("svc: socket %p: no user data\n", sk);
                goto out;
        }
-       spin_lock_bh(&svsk->sk_lock);
-       svsk->sk_close = 1;
+       set_bit(SK_CLOSE, &svsk->sk_flags);
        svc_sock_enqueue(svsk);
-       spin_unlock_bh(&svsk->sk_lock);
  out:
        if (sk->sleep && waitqueue_active(sk->sleep))
                wake_up_interruptible_all(sk->sleep);
@@ -536,10 +515,8 @@ svc_tcp_data_ready(struct sock *sk, int count)
                        sk, sk->user_data);
        if (!(svsk = (struct svc_sock *)(sk->user_data)))
                goto out;
-       spin_lock_bh(&svsk->sk_lock);
-       svsk->sk_data++;
+       set_bit(SK_DATA, &svsk->sk_flags);
        svc_sock_enqueue(svsk);
-       spin_unlock_bh(&svsk->sk_lock);
  out:
        if (sk->sleep && waitqueue_active(sk->sleep))
                wake_up_interruptible(sk->sleep);
@@ -572,13 +549,14 @@ svc_tcp_accept(struct svc_sock *svsk)
        newsock->type = sock->type;
        newsock->ops = ops = sock->ops;
 
+       clear_bit(SK_CONN, &svsk->sk_flags);
        if ((err = ops->accept(sock, newsock, O_NONBLOCK)) < 0) {
-               if (net_ratelimit())
+               if (err != -EAGAIN && net_ratelimit())
                        printk(KERN_WARNING "%s: accept failed (err %d)!\n",
                                   serv->sv_name, -err);
                goto failed;            /* aborted connection or whatever */
        }
-
+       set_bit(SK_CONN, &svsk->sk_flags);
        slen = sizeof(sin);
        err = ops->getname(newsock, (struct sockaddr *) &sin, &slen, 1);
        if (err < 0) {
@@ -609,11 +587,9 @@ svc_tcp_accept(struct svc_sock *svsk)
        /* Precharge. Data may have arrived on the socket before we
         * installed the data_ready callback. 
         */
-       spin_lock_bh(&newsvsk->sk_lock);
-       newsvsk->sk_data = 1;
-       newsvsk->sk_temp = 1;
+       set_bit(SK_DATA, &newsvsk->sk_flags);
+       set_bit(SK_TEMP, &newsvsk->sk_flags);
        svc_sock_enqueue(newsvsk);
-       spin_unlock_bh(&newsvsk->sk_lock);
 
        if (serv->sv_stats)
                serv->sv_stats->nettcpconn++;
@@ -634,23 +610,25 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
        struct svc_sock *svsk = rqstp->rq_sock;
        struct svc_serv *serv = svsk->sk_server;
        struct svc_buf  *bufp = &rqstp->rq_argbuf;
-       int             len, ready, used;
+       int             len;
 
        dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
-                       svsk, svsk->sk_data, svsk->sk_conn, svsk->sk_close);
+               svsk, test_bit(SK_DATA, &svsk->sk_flags),
+               test_bit(SK_CONN, &svsk->sk_flags),
+               test_bit(SK_CLOSE, &svsk->sk_flags));
 
-       if (svsk->sk_close) {
+       if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
                svc_delete_socket(svsk);
                return 0;
        }
 
-       if (svsk->sk_conn) {
+       if (test_bit(SK_CONN, &svsk->sk_flags)) {
                svc_tcp_accept(svsk);
-               svc_sock_accepted(svsk);
+               svc_sock_received(svsk);
                return 0;
        }
 
-       ready = svsk->sk_data;
+       clear_bit(SK_DATA, &svsk->sk_flags);
 
        /* Receive data. If we haven't got the record length yet, get
         * the next four bytes. Otherwise try to gobble up as much as
@@ -694,15 +672,10 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
                 */
                dprintk("svc: incomplete TCP record (%d of %d)\n",
                        len, svsk->sk_reclen);
-               svc_sock_received(svsk, ready);
+               svc_sock_received(svsk);
                return -EAGAIN; /* record not complete */
        }
-       /* if we think there is only one more record to read, but
-        * it is bigger than we expect, then two records must have arrived
-        * together, so pretend we aren't using the record.. */
-       if (len > svsk->sk_reclen && ready == 1)
-               used = 0;
-       else    used = 1;
+       set_bit(SK_DATA, &svsk->sk_flags);
 
        /* Frob argbuf */
        bufp->iov[0].iov_base += 4;
@@ -729,7 +702,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
        svsk->sk_reclen = 0;
        svsk->sk_tcplen = 0;
 
-       svc_sock_received(svsk, used);
+       svc_sock_received(svsk);
        if (serv->sv_stats)
                serv->sv_stats->nettcpcnt++;
 
@@ -738,11 +711,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
 error:
        if (len == -EAGAIN) {
                dprintk("RPC: TCP recvfrom got EAGAIN\n");
-               svc_sock_received(svsk, ready); /* Clear data ready */
+               svc_sock_received(svsk);
        } else {
                printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
                                        svsk->sk_server->sv_name, -len);
-               svc_sock_received(svsk, 0);
+               svc_sock_received(svsk);
        }
 
        return len;
@@ -949,7 +922,6 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock,
        svsk->sk_ostate = inet->state_change;
        svsk->sk_odata = inet->data_ready;
        svsk->sk_server = serv;
-       spin_lock_init(&svsk->sk_lock);
 
        /* Initialize the socket */
        if (sock->type == SOCK_DGRAM)
@@ -1045,11 +1017,11 @@ svc_delete_socket(struct svc_sock *svsk)
        spin_lock_bh(&serv->sv_lock);
 
        list_del(&svsk->sk_list);
-       if (svsk->sk_qued)
+       if (test_bit(SK_QUED, &svsk->sk_flags))
                list_del(&svsk->sk_ready);
 
 
-       svsk->sk_dead = 1;
+       set_bit(SK_DEAD, &svsk->sk_flags);
 
        if (!svsk->sk_inuse) {
                spin_unlock_bh(&serv->sv_lock);