#include <linux/kernel.h>
#include <linux/slab.h>
#include <net/sock.h>
#include <linux/in.h>
#include "rds.h"
void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
__be32 saddr)
{
atomic_set(&inc->i_refcount, 1);
INIT_LIST_HEAD(&inc->i_item);
inc->i_conn = conn;
inc->i_saddr = saddr;
inc->i_rdma_cookie = 0;
}
EXPORT_SYMBOL_GPL(rds_inc_init);
static void rds_inc_addref(struct rds_incoming *inc)
{
rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
atomic_inc(&inc->i_refcount);
}
void rds_inc_put(struct rds_incoming *inc)
{
rdsdebug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
if (atomic_dec_and_test(&inc->i_refcount)) {
BUG_ON(!list_empty(&inc->i_item));
inc->i_conn->c_trans->inc_free(inc);
}
}
EXPORT_SYMBOL_GPL(rds_inc_put);
static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
struct rds_cong_map *map,
int delta, __be16 port)
{
int now_congested;
if (delta == 0)
return;
rs->rs_rcv_bytes += delta;
now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
"now_cong %d delta %d\n",
rs, &rs->rs_bound_addr,
ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
rds_sk_rcvbuf(rs), now_congested, delta);
if (!rs->rs_congested && now_congested) {
rs->rs_congested = 1;
rds_cong_set_bit(map, port);
rds_cong_queue_updates(map);
}
else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
rs->rs_congested = 0;
rds_cong_clear_bit(map, port);
rds_cong_queue_updates(map);
}
}
static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
{
struct rds_header *hdr = &inc->i_hdr;
unsigned int pos = 0, type, len;
union {
struct rds_ext_header_version version;
struct rds_ext_header_rdma rdma;
struct rds_ext_header_rdma_dest rdma_dest;
} buffer;
while (1) {
len = sizeof(buffer);
type = rds_message_next_extension(hdr, &pos, &buffer, &len);
if (type == RDS_EXTHDR_NONE)
break;
switch (type) {
case RDS_EXTHDR_RDMA:
rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
break;
case RDS_EXTHDR_RDMA_DEST:
inc->i_rdma_cookie = rds_rdma_make_cookie(
be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
break;
}
}
}
void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_incoming *inc, gfp_t gfp, enum km_type km)
{
struct rds_sock *rs = NULL;
struct sock *sk;
unsigned long flags;
inc->i_conn = conn;
inc->i_rx_jiffies = jiffies;
rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
"flags 0x%x rx_jiffies %lu\n", conn,
(unsigned long long)conn->c_next_rx_seq,
inc,
(unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
be32_to_cpu(inc->i_hdr.h_len),
be16_to_cpu(inc->i_hdr.h_sport),
be16_to_cpu(inc->i_hdr.h_dport),
inc->i_hdr.h_flags,
inc->i_rx_jiffies);
if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
(inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
rds_stats_inc(s_recv_drop_old_seq);
goto out;
}
conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
rds_stats_inc(s_recv_ping);
rds_send_pong(conn, inc->i_hdr.h_sport);
goto out;
}
rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
if (!rs) {
rds_stats_inc(s_recv_drop_no_sock);
goto out;
}
rds_recv_incoming_exthdrs(inc, rs);
sk = rds_rs_to_sk(rs);
write_lock_irqsave(&rs->rs_recv_lock, flags);
if (!sock_flag(sk, SOCK_DEAD)) {
rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
rds_stats_inc(s_recv_queued);
rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
be32_to_cpu(inc->i_hdr.h_len),
inc->i_hdr.h_dport);
rds_inc_addref(inc);
list_add_tail(&inc->i_item, &rs->rs_recv_queue);
__rds_wake_sk_sleep(sk);
} else {
rds_stats_inc(s_recv_drop_dead_sock);
}
write_unlock_irqrestore(&rs->rs_recv_lock, flags);
out:
if (rs)
rds_sock_put(rs);
}
EXPORT_SYMBOL_GPL(rds_recv_incoming);
static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
{
unsigned long flags;
if (!*inc) {
read_lock_irqsave(&rs->rs_recv_lock, flags);
if (!list_empty(&rs->rs_recv_queue)) {
*inc = list_entry(rs->rs_recv_queue.next,
struct rds_incoming,
i_item);
rds_inc_addref(*inc);
}
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
}
return *inc != NULL;
}
static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
int drop)
{
struct sock *sk = rds_rs_to_sk(rs);
int ret = 0;
unsigned long flags;
write_lock_irqsave(&rs->rs_recv_lock, flags);
if (!list_empty(&inc->i_item)) {
ret = 1;
if (drop) {
rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
-be32_to_cpu(inc->i_hdr.h_len),
inc->i_hdr.h_dport);
list_del_init(&inc->i_item);
rds_inc_put(inc);
}
}
write_unlock_irqrestore(&rs->rs_recv_lock, flags);
rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
return ret;
}
int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
{
struct rds_notifier *notifier;
struct rds_rdma_notify cmsg = { 0 };
unsigned int count = 0, max_messages = ~0U;
unsigned long flags;
LIST_HEAD(copy);
int err = 0;
if (msghdr) {
max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
if (!max_messages)
max_messages = 1;
}
spin_lock_irqsave(&rs->rs_lock, flags);
while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
notifier = list_entry(rs->rs_notify_queue.next,
struct rds_notifier, n_list);
list_move(¬ifier->n_list, ©);
count++;
}
spin_unlock_irqrestore(&rs->rs_lock, flags);
if (!count)
return 0;
while (!list_empty(©)) {
notifier = list_entry(copy.next, struct rds_notifier, n_list);
if (msghdr) {
cmsg.user_token = notifier->n_user_token;
cmsg.status = notifier->n_status;
err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
sizeof(cmsg), &cmsg);
if (err)
break;
}
list_del_init(¬ifier->n_list);
kfree(notifier);
}
if (!list_empty(©)) {
spin_lock_irqsave(&rs->rs_lock, flags);
list_splice(©, &rs->rs_notify_queue);
spin_unlock_irqrestore(&rs->rs_lock, flags);
}
return err;
}
static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
{
uint64_t notify = rs->rs_cong_notify;
unsigned long flags;
int err;
err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
sizeof(notify), ¬ify);
if (err)
return err;
spin_lock_irqsave(&rs->rs_lock, flags);
rs->rs_cong_notify &= ~notify;
spin_unlock_irqrestore(&rs->rs_lock, flags);
return 0;
}
static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg)
{
int ret = 0;
if (inc->i_rdma_cookie) {
ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
if (ret)
return ret;
}
return 0;
}
int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
size_t size, int msg_flags)
{
struct sock *sk = sock->sk;
struct rds_sock *rs = rds_sk_to_rs(sk);
long timeo;
int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
struct sockaddr_in *sin;
struct rds_incoming *inc = NULL;
timeo = sock_rcvtimeo(sk, nonblock);
rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
if (msg_flags & MSG_OOB)
goto out;
while (1) {
if (!list_empty(&rs->rs_notify_queue)) {
ret = rds_notify_queue_get(rs, msg);
break;
}
if (rs->rs_cong_notify) {
ret = rds_notify_cong(rs, msg);
break;
}
if (!rds_next_incoming(rs, &inc)) {
if (nonblock) {
ret = -EAGAIN;
break;
}
timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
(!list_empty(&rs->rs_notify_queue) ||
rs->rs_cong_notify ||
rds_next_incoming(rs, &inc)), timeo);
rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
timeo);
if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
continue;
ret = timeo;
if (ret == 0)
ret = -ETIMEDOUT;
break;
}
rdsdebug("copying inc %p from %pI4:%u to user\n", inc,
&inc->i_conn->c_faddr,
ntohs(inc->i_hdr.h_sport));
ret = inc->i_conn->c_trans->inc_copy_to_user(inc, msg->msg_iov,
size);
if (ret < 0)
break;
if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
rds_inc_put(inc);
inc = NULL;
rds_stats_inc(s_recv_deliver_raced);
continue;
}
if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
if (msg_flags & MSG_TRUNC)
ret = be32_to_cpu(inc->i_hdr.h_len);
msg->msg_flags |= MSG_TRUNC;
}
if (rds_cmsg_recv(inc, msg)) {
ret = -EFAULT;
goto out;
}
rds_stats_inc(s_recv_delivered);
sin = (struct sockaddr_in *)msg->msg_name;
if (sin) {
sin->sin_family = AF_INET;
sin->sin_port = inc->i_hdr.h_sport;
sin->sin_addr.s_addr = inc->i_saddr;
memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
}
break;
}
if (inc)
rds_inc_put(inc);
out:
return ret;
}
void rds_clear_recv_queue(struct rds_sock *rs)
{
struct sock *sk = rds_rs_to_sk(rs);
struct rds_incoming *inc, *tmp;
unsigned long flags;
write_lock_irqsave(&rs->rs_recv_lock, flags);
list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
-be32_to_cpu(inc->i_hdr.h_len),
inc->i_hdr.h_dport);
list_del_init(&inc->i_item);
rds_inc_put(inc);
}
write_unlock_irqrestore(&rs->rs_recv_lock, flags);
}
void rds_inc_info_copy(struct rds_incoming *inc,
struct rds_info_iterator *iter,
__be32 saddr, __be32 daddr, int flip)
{
struct rds_info_message minfo;
minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
minfo.len = be32_to_cpu(inc->i_hdr.h_len);
if (flip) {
minfo.laddr = daddr;
minfo.faddr = saddr;
minfo.lport = inc->i_hdr.h_dport;
minfo.fport = inc->i_hdr.h_sport;
} else {
minfo.laddr = saddr;
minfo.faddr = daddr;
minfo.lport = inc->i_hdr.h_sport;
minfo.fport = inc->i_hdr.h_dport;
}
rds_info_copy(iter, &minfo, sizeof(minfo));
}