goto out_unlock;
}
+ if (((svsk->sk_reserved + serv->sv_bufsz)*2
+ > sock_wspace(svsk->sk_sk))
+ && !test_bit(SK_CLOSE, &svsk->sk_flags)
+ && !test_bit(SK_CONN, &svsk->sk_flags)) {
+ /* Don't enqueue while not enough space for reply */
+ dprintk("svc: socket %p no space, %d > %ld, not enqueued\n",
+ svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz,
+ sock_wspace(svsk->sk_sk));
+ goto out_unlock;
+ }
+
/* Mark socket as busy. It will remain in this state until the
* server has processed all pending data and put the socket back
* on the idle list.
rqstp, rqstp->rq_sock);
rqstp->rq_sock = svsk;
svsk->sk_inuse++;
+ rqstp->rq_reserved = serv->sv_bufsz;
+ svsk->sk_reserved += rqstp->rq_reserved;
wake_up(&rqstp->rq_wait);
} else {
dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
}
+/**
+ * svc_reserve - change the space reserved for the reply to a request.
+ * @rqstp: The request in question
+ * @space: new max space to reserve
+ *
+ * Each request reserves some space on the output queue of the socket
+ * to make sure the reply fits. This function reduces that reserved
+ * space to be the amount of space used already, plus @space.
+ *
+ */
+void svc_reserve(struct svc_rqst *rqstp, int space)
+{
+ space += rqstp->rq_resbuf.len<<2;
+
+ if (space < rqstp->rq_reserved) {
+ struct svc_sock *svsk = rqstp->rq_sock;
+ spin_lock_bh(&svsk->sk_server->sv_lock);
+ svsk->sk_reserved -= (rqstp->rq_reserved - space);
+ rqstp->rq_reserved = space;
+ spin_unlock_bh(&svsk->sk_server->sv_lock);
+
+ svc_sock_enqueue(svsk);
+ }
+}
+
/*
* Release a socket after use.
*/
struct svc_serv *serv = svsk->sk_server;
svc_release_skb(rqstp);
+
+ /* Reset response buffer and release
+ * the reservation.
+ * But first, check that enough space was reserved
+ * for the reply, otherwise we have a bug!
+ */
+ if ((rqstp->rq_resbuf.len<<2) > rqstp->rq_reserved)
+ printk(KERN_ERR "RPC request reserved %d but used %d\n",
+ rqstp->rq_reserved,
+ rqstp->rq_resbuf.len<<2);
+
+ rqstp->rq_resbuf.buf = rqstp->rq_resbuf.base;
+ rqstp->rq_resbuf.len = 0;
+ svc_reserve(rqstp, 0);
rqstp->rq_sock = NULL;
spin_lock_bh(&serv->sv_lock);
msg.msg_control = NULL;
msg.msg_controllen = 0;
- msg.msg_flags = MSG_DONTWAIT;
+ /* This was MSG_DONTWAIT, but I now want it to wait.
+ * The only thing that it would wait for is memory and
+ * if we are fairly low on memory, then we aren't likely
+ * to make much progress anyway.
+ * sk->sndtimeo is set to 30seconds just in case.
+ */
+ msg.msg_flags = 0;
oldfs = get_fs(); set_fs(KERNEL_DS);
len = sock_sendmsg(sock, &msg, buflen);
return len;
}
+/*
+ * Set socket snd and rcv buffer lengths
+ */
+static inline void
+svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
+{
+#if 0
+ mm_segment_t oldfs;
+ oldfs = get_fs(); set_fs(KERNEL_DS);
+ sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
+ (char*)&snd, sizeof(snd));
+ sock_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
+ (char*)&rcv, sizeof(rcv));
+#else
+ /* sock_setsockopt limits use to sysctl_?mem_max,
+ * which isn't acceptable. Until that is made conditional
+ * on not having CAP_SYS_RESOURCE or similar, we go direct...
+ * DaveM said I could!
+ */
+ lock_sock(sock->sk);
+ sock->sk->sndbuf = snd * 2;
+ sock->sk->rcvbuf = rcv * 2;
+ sock->sk->userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
+ release_sock(sock->sk);
+#endif
+}
/*
* INET callback when data has been received on the socket.
*/
wake_up_interruptible(sk->sleep);
}
+/*
+ * INET callback when space is newly available on the socket.
+ */
+static void
+svc_write_space(struct sock *sk)
+{
+ struct svc_sock *svsk = (struct svc_sock *)(sk->user_data);
+
+ if (svsk) {
+ dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
+ svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
+ svc_sock_enqueue(svsk);
+ }
+
+ if (sk->sleep && waitqueue_active(sk->sleep)) {
+ printk(KERN_WARNING "RPC svc_write_space: some sleeping on %p\n",
+ svsk);
+ wake_up_interruptible(sk->sleep);
+ }
+}
+
/*
* Receive a datagram from a UDP socket.
*/
svc_udp_init(struct svc_sock *svsk)
{
svsk->sk_sk->data_ready = svc_udp_data_ready;
+ svsk->sk_sk->write_space = svc_write_space;
svsk->sk_recvfrom = svc_udp_recvfrom;
svsk->sk_sendto = svc_udp_sendto;
if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 0)))
goto failed;
+ /* make sure that a write doesn't block forever when
+ * low on memory
+ */
+ newsock->sk->sndtimeo = HZ*30;
+
/* Precharge. Data may have arrived on the socket before we
* installed the data_ready callback.
*/
/*
* Send out data on TCP socket.
- * FIXME: Make the sendto call non-blocking in order not to hang
- * a daemon on a dead client. Requires write queue maintenance.
*/
static int
svc_tcp_sendto(struct svc_rqst *rqstp)
dprintk("setting up TCP socket for reading\n");
sk->state_change = svc_tcp_state_change;
sk->data_ready = svc_tcp_data_ready;
+ sk->write_space = svc_write_space;
svsk->sk_reclen = 0;
svsk->sk_tcplen = 0;
+
+ /* sndbuf needs to have room for one request
+ * per thread, otherwise we can stall even when the
+ * network isn't a bottleneck.
+ * rcvbuf just needs to be able to hold a few requests.
+ * Normally they will be removed from the queue
+ * as soon a a complete request arrives.
+ */
+ svc_sock_setbufsize(svsk->sk_sock,
+ svsk->sk_server->sv_nrthreads *
+ svsk->sk_server->sv_bufsz,
+ 3 * svsk->sk_server->sv_bufsz);
}
return 0;
}
+void
+svc_sock_update_bufs(struct svc_serv *serv)
+{
+ /*
+ * The number of server threads has changed. Update
+ * rcvbuf and sndbuf accordingly on all sockets
+ */
+ struct list_head *le;
+
+ spin_lock_bh(&serv->sv_lock);
+ list_for_each(le, &serv->sv_permsocks) {
+ struct svc_sock *svsk =
+ list_entry(le, struct svc_sock, sk_list);
+ struct socket *sock = svsk->sk_sock;
+ if (sock->type == SOCK_DGRAM) {
+ /* udp sockets need large rcvbuf as all pending
+ * requests are still in that buffer.
+ * As outgoing requests do not wait for an
+ * ACK, only a moderate sndbuf is needed
+ */
+ svc_sock_setbufsize(sock,
+ 5 * serv->sv_bufsz,
+ (serv->sv_nrthreads+2)* serv->sv_bufsz);
+ } else if (svsk->sk_sk->state != TCP_LISTEN) {
+ printk(KERN_ERR "RPC update_bufs: permanent sock neither UDP or TCP_LISTEN\n");
+ }
+ }
+ list_for_each(le, &serv->sv_tempsocks) {
+ struct svc_sock *svsk =
+ list_entry(le, struct svc_sock, sk_list);
+ struct socket *sock = svsk->sk_sock;
+ if (sock->type == SOCK_STREAM) {
+ /* See svc_tcp_init above for rationale on buffer sizes */
+ svc_sock_setbufsize(sock,
+ serv->sv_nrthreads *
+ serv->sv_bufsz,
+ 3 * serv->sv_bufsz);
+ } else
+ printk(KERN_ERR "RPC update_bufs: temp sock not TCP\n");
+ }
+ spin_unlock_bh(&serv->sv_lock);
+}
+
/*
* Receive the next request on any socket.
*/
} else if ((svsk = svc_sock_dequeue(serv)) != NULL) {
rqstp->rq_sock = svsk;
svsk->sk_inuse++;
+ rqstp->rq_reserved = serv->sv_bufsz;
+ svsk->sk_reserved += rqstp->rq_reserved;
} else {
/* No data pending. Go to sleep */
svc_serv_enqueue(serv, rqstp);
svsk->sk_sk = inet;
svsk->sk_ostate = inet->state_change;
svsk->sk_odata = inet->data_ready;
+ svsk->sk_owspace = inet->write_space;
svsk->sk_server = serv;
svsk->sk_lastrecv = CURRENT_TIME;
sk->state_change = svsk->sk_ostate;
sk->data_ready = svsk->sk_odata;
+ sk->write_space = svsk->sk_owspace;
spin_lock_bh(&serv->sv_lock);