#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <sys/sem.h>
#include "mqshm.h"
#define LINKER_SIZE (sizeof(link_t))
#ifndef ASSERT
#include <assert.h>
#define ASSERT(exp) assert(exp)
#endif
const int SHM_MAX_PIDS = 8;
union semun
{
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO (Linux specific) */
};
// signal handler
static bool isTimeout = false;
static void catchSignal(int sig)
{
if (sig == SIGALRM)
isTimeout = true;
}
// start timer
static void startTimer(int timeout)
{
struct sigaction newAction;
newAction.sa_handler = catchSignal;
newAction.sa_flags = 0;
sigaction(SIGALRM, &newAction, NULL);
alarm(timeout);
isTimeout = false;
}
// reset timer
static void stopTimer()
{
alarm(0);
sigaction(SIGALRM, NULL, NULL);
isTimeout = false;
}
static void startWaitStatus()
{
struct sigaction newAction;
newAction.sa_handler = catchSignal;
newAction.sa_flags = 0;
sigaction(SIGUSR1, &newAction, NULL);
}
static void stopWaitStatus()
{
sigaction(SIGUSR1, NULL, NULL);
}
// ~
static inline link_t GET_LK_V(link_t *linkp)
{
return *linkp & ~ALIGN_MASK;
}
static inline void SET_LK_V(link_t *linkp, link_t linkv)
{
*linkp &= ALIGN_MASK;
*linkp += linkv;
}
// MQShmPriv
// MQShm header structure
// use only byte and word elements for ARM word alignment
struct MQShmPriv
{
int _pids[SHM_MAX_PIDS]; // process who is waiting for status
ushort _placeholder[1]; // placeholder for DWORD alignment
ushort _status; // status of this shm
ushort _cx; // user defined counter
ushort _nItems; // how many items in the shm
link_t _offset; // free block offset
link_t _header; // item list header
};
#define SHM_GET_PID(i) \
(_priv->_pids[i])
#define SHM_SET_PID(i, pid) \
(_priv->_pids[i] = pid)
// MQShm members
void MQShm::pidLock()
{
struct sembuf sembuf;
sembuf.sem_num = 0;
sembuf.sem_op = -1;
sembuf.sem_flg = SEM_UNDO;
if (semop(_pidSemId, &sembuf, 1) == -1) {
perror("semop (pid semphone)");
exit(1);
}
}
void MQShm::pidUnlock()
{
struct sembuf sembuf;
sembuf.sem_num = 0;
sembuf.sem_op = +1;
sembuf.sem_flg = SEM_UNDO;
if (semop(_pidSemId, &sembuf, 1) == -1) {
perror("semop (pid semphone)");
exit(1);
}
}
MQShm::MQShm(key_t key, size_t size, bool init)
{
bool isCreater = true;
_size = size;
_myPidIdx = -1;
// create shm
if ((_shmId = shmget(key, _size, O_RDWR | IPC_CREAT | IPC_EXCL)) == -1) {
char msg[32];
sprintf(msg, "shmget (0x%x)", key);
perror(msg);
isCreater = false;
if ((_shmId = shmget(key, _size, O_RDWR | IPC_CREAT)) == -1) {
perror(msg);
exit(1);
}
}
if ((_shm = (uchar *)shmat(_shmId, NULL, 0)) == (void *)-1) {
perror("shmat");
exit(1);
}
// create shm semphone
if ((_shmSemId = semget(key, 1, O_RDWR | IPC_CREAT)) == -1) {
perror("semget (shm semphone)");
exit(1);
}
// create pid semphone
if ((_pidSemId = semget(key + 1, 1, O_RDWR | IPC_CREAT)) == -1) {
perror("semget (pid semphone)");
exit(1);
}
_priv = (MQShmPriv *)_shm;
if (isCreater || init) {
printf("to initialize the MQShm (0x%x)\n", _shmId);
// initialize the shm by creater
memset(_priv->_pids, 0, sizeof(_priv->_pids));
memset(_priv->_placeholder, 0XCC, sizeof(_priv->_placeholder));
_priv->_status = MQShm::StUnknown;
_priv->_cx = 0;
_priv->_nItems = 0;
_priv->_offset = sizeof(MQShmPriv);
_priv->_header = MQShm::null;
// initialize shm and pid semphone
union semun data;
data.val = 1;
if (semctl(_shmSemId, 0, SETVAL, data) == -1) {
perror("semctl (shm semphone)");
exit(1);
}
if (semctl(_pidSemId, 0, SETVAL, data) == -1) {
perror("semctl (pid semphone)");
exit(1);
}
}
}
MQShm::~MQShm()
{
if (shmdt(_shm) == -1)
perror("shmdt");
}
void MQShm::close()
{
printf("close MQShm (0x%x)\n", _shmId);
if (shmdt(_shm) == -1)
perror("shmdt");
if (shmctl(_shmId, IPC_RMID, NULL) == -1)
perror("shmctl");
if (semctl(_shmSemId, 0, IPC_RMID) == -1)
perror("semctl (shm semphone)");
if (semctl(_pidSemId, 0, IPC_RMID) == -1)
perror("semctl (pid semphone)");
_shmId = -1;
_shmSemId = -1;
_pidSemId = -1;
_shm = NULL;
_priv = NULL;
}
void MQShm::lock()
{
struct sembuf sembuf;
sembuf.sem_num = 0;
sembuf.sem_op = -1;
sembuf.sem_flg = SEM_UNDO;
if (semop(_shmSemId, &sembuf, 1) == -1) {
perror("semop");
exit(1);
}
}
void MQShm::unlock()
{
#ifndef NDEBUG
ASSERT(locked());
#endif
struct sembuf sembuf;
sembuf.sem_num = 0;
sembuf.sem_op = +1;
sembuf.sem_flg = SEM_UNDO;
if (semop(_shmSemId, &sembuf, 1) == -1) {
perror("semop");
exit(1);
}
}
bool MQShm::locked()
{
int val;
if ((val = semctl(_shmSemId, 0, GETVAL)) == -1) {
perror("semctl");
exit(1);
}
return val <= 0;
}
uint MQShm::freeSpace() const
{
// the free space must contain a byte for 2-byte alignment
return _size - _priv->_offset - LINKER_SIZE - 1;
}
uint MQShm::itemCount() const
{
return _priv->_nItems;
}
void MQShm::clrCounter()
{
_priv->_cx = 0;
}
uint MQShm::getCounter() const
{
return _priv->_cx;
}
void MQShm::setCounter(uint n)
{
_priv->_cx = n;
}
int MQShm::status() const
{
return _priv->_status;
}
void MQShm::setStatus(int st)
{
_priv->_status = st;
pidLock();
for (int i = 0; i < SHM_MAX_PIDS; ++i)
if (SHM_GET_PID(i) != 0)
kill(SHM_GET_PID(i), SIGUSR1);
pidUnlock();
usleep(50000); // a period for waking up process
// who block on the status
}
bool MQShm::waitStatus(int st, uint timeout)
{
if (_priv->_status & st)
return true;
bool result = true;
startWaitStatus();
startTimer(timeout);
while (!(_priv->_status & st) && result) {
if (_myPidIdx == -1) {
pidLock();
int i = 0;
while (i < SHM_MAX_PIDS && SHM_GET_PID(i) != 0)
++i;
ASSERT(i < SHM_MAX_PIDS);
SHM_SET_PID(i, getpid());
_myPidIdx = i;
pidUnlock();
}
pause();
if (isTimeout)
result = false;
}
stopWaitStatus();
stopTimer();
if (_myPidIdx != -1) {
pidLock();
SHM_SET_PID(_myPidIdx, 0);
_myPidIdx = -1;
pidUnlock();
}
return result;
}
void MQShm::append(const uchar *data, uint len)
{
ASSERT(freeSpace() >= len);
// append the new item
uchar *p = _shm + _priv->_offset;
memcpy(p, data, len);
// do alignment for arm
bool doAlign = false;
if ((int)(p += len) % 2 != 0) {
*p++ = 0xcc; // the byte for alignment
doAlign = true;
}
*((link_t *) p) = MQShm::null;
if (doAlign)
*((link_t *) p) |= ALIGN_MASK;
// link the new item
link_t new_linker = p - _shm;
SET_LK_V((link_t *)(_shm + _priv->_offset - LINKER_SIZE), new_linker);
// update offset pointer
_priv->_offset = new_linker + LINKER_SIZE;
// update counter
++(_priv->_nItems);
}
void MQShm::clear()
{
_priv->_nItems = 0;
_priv->_offset = sizeof(MQShmPriv);
_priv->_header = MQShm::null;
}
MQShm::const_iterator MQShm::begin() const
{
return MQShm::const_iterator(this, sizeof(MQShmPriv) - LINKER_SIZE);
}
MQShm::const_iterator MQShm::end() const
{
link_t linkv = _priv->_offset - LINKER_SIZE;
ASSERT(GET_LK_V((link_t *)(_shm + linkv)) == MQShm::null);
return MQShm::const_iterator(this, linkv);
}
// MQShm::const_iterator members
const uchar *MQShm::const_iterator::data() const
{
return (const uchar *)(_mqShm->_shm + _linkv + LINKER_SIZE);
}
uint MQShm::const_iterator::size() const
{
link_t nextv = GET_LK_V((link_t *)(_mqShm->_shm + _linkv));
uint nAlginBytes = *(link_t *)(_mqShm->_shm + nextv) & ALIGN_MASK ? 1 : 0;
return nextv - _linkv - LINKER_SIZE - nAlginBytes;
}
MQShm::const_iterator &MQShm::const_iterator::operator++()
{
_linkv = GET_LK_V((link_t *)(_mqShm->_shm + _linkv));
return *this;
}
bool MQShm::const_iterator::operator==(const const_iterator &rhs) const
{
return _mqShm == rhs._mqShm && _linkv == rhs._linkv;
}
bool MQShm::const_iterator::operator!=(const const_iterator &rhs) const
{
return _mqShm != rhs._mqShm || _linkv != rhs._linkv;
}