Logo Search packages:      
Sourcecode: jabber-aim version File versions  Download package

txqueue.c

/*
 *  aim_txqueue.c
 *
 * Herein lies all the mangement routines for the transmit (Tx) queue.
 *
 */

#define FAIM_INTERNAL
#include <aim.h>
#include <pth.h>

#ifndef _WIN32
#include <sys/socket.h>
#endif

/*
 * Allocate a new tx frame.
 *
 * This is more for looks than anything else.
 *
 * Right now, that is.  If/when we implement a pool of transmit
 * frames, this will become the request-an-unused-frame part.
 *
 * framing = AIM_FRAMETYPE_OFT/FLAP
 * chan = channel for FLAP, hdrtype for OFT
 *
 */
faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu8_t chan, int datalen)
{
      aim_frame_t *fr;

      if (!conn) {
            faimdprintf(sess, 0, "aim_tx_new: ERROR: no connection specified\n");
            return NULL;
      }

      /* For sanity... */
      if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || 
                  (conn->type == AIM_CONN_TYPE_RENDEZVOUS_OUT)) {
            if (framing != AIM_FRAMETYPE_OFT) {
                  faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
                  return NULL;
            }
      } else {
            if (framing != AIM_FRAMETYPE_FLAP) {
                  faimdprintf(sess, 0, "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
                  return NULL;
            }
      }

      if (!(fr = (aim_frame_t *)malloc(sizeof(aim_frame_t))))
            return NULL;
      memset(fr, 0, sizeof(aim_frame_t));

      fr->conn = conn; 

      fr->hdrtype = framing;

      if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {

            fr->hdr.flap.type = chan;

      } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) {

            fr->hdr.oft.type = chan;
            fr->hdr.oft.hdr2len = 0; /* this will get setup by caller */

      } else 
            faimdprintf(sess, 0, "tx_new: unknown framing\n");

      if (datalen > 0) {
            fu8_t *data;

            if (!(data = (unsigned char *)malloc(datalen))) {
                  aim_frame_destroy(fr);
                  return NULL;
            }

            aim_bstream_init(&fr->data, data, datalen);
      }

      return fr;
}

/*
 * aim_tx_enqeue__queuebased()
 *
 * The overall purpose here is to enqueue the passed in command struct
 * into the outgoing (tx) queue.  Basically...
 *   1) Make a scope-irrelevent copy of the struct
 *   3) Mark as not-sent-yet
 *   4) Enqueue the struct into the list
 *   6) Return
 *
 * Note that this is only used when doing queue-based transmitting;
 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
 *
 */
static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
{

      if (!fr->conn) {
            faimdprintf(sess, 1, "aim_tx_enqueue: WARNING: enqueueing packet with no connecetion\n");
            fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
      }

      if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
            /* assign seqnum -- XXX should really not assign until hardxmit */
            fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
      }

      fr->handled = 0; /* not sent yet */

      /* see overhead note in aim_rxqueue counterpart */
      if (!sess->queue_outgoing)
            sess->queue_outgoing = fr;
      else {
            aim_frame_t *cur;

            for (cur = sess->queue_outgoing; cur->next; cur = cur->next)
                  ;
            cur->next = fr;
      }

      return 0;
}

/*
 * aim_tx_enqueue__immediate()
 *
 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
 * the whole queue mess when you want immediate writes to happen.
 *
 * Basically the same as its __queuebased couterpart, however
 * instead of doing a list append, it just calls aim_tx_sendframe()
 * right here. 
 * 
 */
static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
{

      if (!fr->conn) {
            faimdprintf(sess, 1, "aim_tx_enqueue: ERROR: packet has no connection\n");
            aim_frame_destroy(fr);
            return 0;
      }

      if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
            fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);

      fr->handled = 0; /* not sent yet */

      aim_tx_sendframe(sess, fr);

      aim_frame_destroy(fr);

      return 0;
}

faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
{
      
      if (what == AIM_TX_QUEUED)
            sess->tx_enqueue = &aim_tx_enqueue__queuebased;
      else if (what == AIM_TX_IMMEDIATE) 
            sess->tx_enqueue = &aim_tx_enqueue__immediate;
      else if (what == AIM_TX_USER) {
            if (!func)
                  return -EINVAL;
            sess->tx_enqueue = func;
      } else
            return -EINVAL; /* unknown action */

      return 0;
}

faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
{
      
      /*
       * If we want to send a connection thats inprogress, we have to force
       * them to use the queue based version. Otherwise, use whatever they
       * want.
       */
      if (fr && fr->conn && 
                  (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
            return aim_tx_enqueue__queuebased(sess, fr);
      }

      return (*sess->tx_enqueue)(sess, fr);
}

/* 
 *  aim_get_next_txseqnum()
 *
 *   This increments the tx command count, and returns the seqnum
 *   that should be stamped on the next FLAP packet sent.  This is
 *   normally called during the final step of packet preparation
 *   before enqueuement (in aim_tx_enqueue()).
 *
 */
faim_internal flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
{
      flap_seqnum_t ret;
      
      ret = ++conn->seqnum;

      return ret;
}

static int aim_send(int fd, const void *buf, size_t count)
{
      int left, cur;

      for (cur = 0, left = count; left; ) {
            int ret;

            ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
            if (ret == -1)
                  return -1;
            else if (ret == 0)
                  return cur;

            cur += ret;
            left -= ret;
      }

      return cur;
}

static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
{
      int wrote = 0;

      if (!bs || !conn || (count < 0))
            return -EINVAL;

      if (count > aim_bstream_empty(bs))
            count = aim_bstream_empty(bs); /* truncate to remaining space */

      if (count)
            wrote = aim_send(conn->fd, bs->data + bs->offset, count);

      if (((aim_session_t *)conn->sessv)->debug >= 2) {
            int i;
            aim_session_t *sess = (aim_session_t *)conn->sessv;

            faimdprintf(sess, 2, "\nOutgoing data: (%d bytes)", wrote);
            for (i = 0; i < wrote; i++) {
                  if (!(i % 8)) 
                        faimdprintf(sess, 2, "\n\t");
                  faimdprintf(sess, 2, "0x%02x ", *(bs->data + bs->offset + i));
            }
            faimdprintf(sess, 2, "\n");
      }


      bs->offset += wrote;

      return wrote;     
}

static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
{
      aim_bstream_t obs;
      fu8_t *obs_raw;
      int payloadlen, err = 0, obslen;

      payloadlen = aim_bstream_curpos(&fr->data);

      if (!(obs_raw = malloc(6 + payloadlen)))
            return -ENOMEM;

      aim_bstream_init(&obs, obs_raw, 6 + payloadlen);

      /* FLAP header */
      aimbs_put8(&obs, 0x2a);
      aimbs_put8(&obs, fr->hdr.flap.type);
      aimbs_put16(&obs, fr->hdr.flap.seqnum);
      aimbs_put16(&obs, payloadlen);

      /* payload */
      aim_bstream_rewind(&fr->data);
      aimbs_putbs(&obs, &fr->data, payloadlen);

      obslen = aim_bstream_curpos(&obs);
      aim_bstream_rewind(&obs);

      if (aim_bstream_send(&obs, fr->conn, obslen) != obslen)
            err = -errno;
      
      free(obs_raw); /* XXX aim_bstream_free */

      fr->handled = 1;
      fr->conn->lastactivity = time(NULL);

      return err;
}

static int sendframe_oft(aim_session_t *sess, aim_frame_t *fr)
{
      aim_bstream_t hbs;
      fu8_t *hbs_raw;
      int hbslen;
      int err = 0;

      hbslen = 8 + fr->hdr.oft.hdr2len;

      if (!(hbs_raw = malloc(hbslen)))
            return -1;

      aim_bstream_init(&hbs, hbs_raw, hbslen);

      aimbs_putraw(&hbs, fr->hdr.oft.magic, 4);
      aimbs_put16(&hbs, fr->hdr.oft.hdr2len + 8);
      aimbs_put16(&hbs, fr->hdr.oft.type);
      aimbs_putraw(&hbs, fr->hdr.oft.hdr2, fr->hdr.oft.hdr2len);

      aim_bstream_rewind(&hbs);
      
      if (aim_bstream_send(&hbs, fr->conn, hbslen) != hbslen) {

            err = -errno;
            
      } else if (aim_bstream_curpos(&fr->data)) {
            int len;

            len = aim_bstream_curpos(&fr->data);
            aim_bstream_rewind(&fr->data);
            
            if (aim_bstream_send(&fr->data, fr->conn, len) != len)
                  err = -errno;
      }

      free(hbs_raw); /* XXX aim_bstream_free */

      fr->handled = 1;
      fr->conn->lastactivity = time(NULL);


      return err;
}

faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
{
      if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
            return sendframe_flap(sess, fr);
      else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
            return sendframe_oft(sess, fr);
      return -1;
}

faim_export int aim_tx_flushqueue(aim_session_t *sess)
{
      aim_frame_t *cur;

      for (cur = sess->queue_outgoing; cur; cur = cur->next) {

            if (cur->handled)
                        continue; /* already been sent */

            if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
                  continue;

            /*
             * And now for the meager attempt to force transmit
             * latency and avoid missed messages.
             */
            if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
                  /* 
                   * XXX Just do this right.
                   */
                  int duration = (cur->conn->lastactivity
                         + cur->conn->forcedlatency) - time(NULL);
                  pth_nap(pth_time(0, duration * 1000));
            }

            /* XXX this should call the custom "queuing" function!! */
            aim_tx_sendframe(sess, cur);
      }

      /* purge sent commands from queue */
      aim_tx_purgequeue(sess);

      return 0;
}

/*
 *  aim_tx_purgequeue()
 *  
 *  This is responsable for removing sent commands from the transmit 
 *  queue. This is not a required operation, but it of course helps
 *  reduce memory footprint at run time!  
 *
 */
faim_export void aim_tx_purgequeue(aim_session_t *sess)
{
      aim_frame_t *cur, **prev;

      for (prev = &sess->queue_outgoing; (cur = *prev); ) {

            if (cur->handled) {
                  *prev = cur->next;

                  aim_frame_destroy(cur);

            } else
                  prev = &cur->next;
      }

      return;
}

/**
 * aim_tx_cleanqueue - get rid of packets waiting for tx on a dying conn
 * @sess: session
 * @conn: connection that's dying
 *
 * for now this simply marks all packets as sent and lets them
 * disappear without warning.
 *
 */
faim_export void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
{
      aim_frame_t *cur;

      for (cur = sess->queue_outgoing; cur; cur = cur->next) {
            if (cur->conn == conn)
                  cur->handled = 1;
      }

      return;
}



Generated by  Doxygen 1.6.0   Back to index