MIDAS
Event Buffer Functions (bm_xxx)

Classes

struct  BUFFER_INFO
 
struct  EVENT_DEFRAG_BUFFER
 

Macros

#define MAX_DEFRAG_EVENTS   10
 

Functions

INT bm_match_event (short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
 
void bm_remove_client_locked (BUFFER_HEADER *pheader, int j)
 
static void bm_cleanup_buffer_locked (BUFFER *pbuf, const char *who, DWORD actual_time)
 
static void bm_update_last_activity (DWORD millitime)
 
static BOOL bm_validate_rp (const char *who, const BUFFER_HEADER *pheader, int rp)
 
static int bm_incr_rp_no_check (const BUFFER_HEADER *pheader, int rp, int total_size)
 
static int bm_next_rp (const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
 
static int bm_validate_buffer_locked (const BUFFER *pbuf)
 
static void bm_reset_buffer_locked (BUFFER *pbuf)
 
static void bm_clear_buffer_statistics (HNDLE hDB, BUFFER *pbuf)
 
static void bm_write_buffer_statistics_to_odb_copy (HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
 
static void bm_write_buffer_statistics_to_odb (HNDLE hDB, BUFFER *pbuf, BOOL force)
 
INT bm_open_buffer (const char *buffer_name, INT buffer_size, INT *buffer_handle)
 
INT bm_get_buffer_handle (const char *buffer_name, INT *buffer_handle)
 
INT bm_close_buffer (INT buffer_handle)
 
INT bm_close_all_buffers (void)
 
INT bm_write_statistics_to_odb (void)
 
INT bm_set_cache_size (INT buffer_handle, size_t read_size, size_t write_size)
 
INT bm_compose_event (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
 
INT bm_compose_event_threadsafe (EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD *serial)
 
INT bm_add_event_request (INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
 
INT bm_request_event (HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
 
INT bm_remove_event_request (INT buffer_handle, INT request_id)
 
INT bm_delete_request (INT request_id)
 
static void bm_validate_client_pointers_locked (const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
 
static BOOL bm_update_read_pointer_locked (const char *caller_name, BUFFER_HEADER *pheader)
 
static void bm_wakeup_producers_locked (const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
 
static void bm_dispatch_event (int buffer_handle, EVENT_HEADER *pevent)
 
static void bm_incr_read_cache_locked (BUFFER *pbuf, int total_size)
 
static BOOL bm_peek_read_cache_locked (BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static int bm_peek_buffer_locked (BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
 
static void bm_read_from_buffer_locked (const BUFFER_HEADER *pheader, int rp, std::vector< char > *vecptr, int event_size)
 
static BOOL bm_check_requests (const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static int bm_wait_for_more_events_locked (bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
 
static int bm_fill_read_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
static void bm_convert_event_header (EVENT_HEADER *pevent, int convert_flags)
 
static int bm_wait_for_free_space_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
 
static void bm_write_to_buffer_locked (BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
 
static int bm_find_first_request_locked (BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
 
static void bm_notify_reader_locked (BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
 
INT bm_send_event (INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< char > &event, int timeout_msec)
 
int bm_send_event_vec (int buffer_handle, const std::vector< std::vector< char >> &event, int timeout_msec)
 
static INT bm_flush_cache_locked (bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
 
int bm_send_event_sg (int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
 
static int bm_flush_cache_rpc (int buffer_handle, int timeout_msec)
 
INT bm_flush_cache (int buffer_handle, int timeout_msec)
 
static INT bm_read_buffer (BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
 
static INT bm_receive_event_rpc (INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
 
INT bm_receive_event (INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
 
INT bm_receive_event_alloc (INT buffer_handle, EVENT_HEADER **ppevent, int timeout_msec)
 
INT bm_receive_event_vec (INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
 
static int bm_skip_event (BUFFER *pbuf)
 
INT bm_skip_event (INT buffer_handle)
 
static INT bm_push_buffer (BUFFER *pbuf, int buffer_handle)
 
INT bm_check_buffers ()
 
INT bm_poll_event ()
 
INT bm_empty_buffers ()
 
static INT bm_push_event (const char *buffer_name)
 

Variables

static DWORD _bm_max_event_size = 0
 
static int _bm_lock_timeout = 5 * 60 * 1000
 
static double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
 
static EVENT_DEFRAG_BUFFER defrag_buffer [MAX_DEFRAG_EVENTS]
 

Detailed Description

dox dox


Macro Definition Documentation

◆ MAX_DEFRAG_EVENTS

#define MAX_DEFRAG_EVENTS   10

dox

Definition at line 11239 of file midas.cxx.

Function Documentation

◆ bm_add_event_request()

INT bm_add_event_request ( INT  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
EVENT_HANDLER func,
INT  request_id 
)

dox

Definition at line 8279 of file midas.cxx.

8323 {
8324  if (rpc_is_remote())
8325  return rpc_call(RPC_BM_ADD_EVENT_REQUEST, buffer_handle, event_id,
8326  trigger_mask, sampling_type, (INT) (POINTER_T) func, request_id);
8327 
8328 #ifdef LOCAL_ROUTINES
8329  {
8330  int status = 0;
8331 
8332  BUFFER *pbuf = bm_get_buffer("bm_add_event_request", buffer_handle, &status);
8333 
8334  if (!pbuf)
8335  return status;
8336 
8337  /* lock buffer */
8338  bm_lock_buffer_guard pbuf_guard(pbuf);
8339 
8340  if (!pbuf_guard.is_locked())
8341  return pbuf_guard.get_status();
8342 
8343  /* avoid callback/non callback requests */
8344  if (func == NULL && pbuf->callback) {
8345  pbuf_guard.unlock(); // unlock before cm_msg()
8346  cm_msg(MERROR, "bm_add_event_request", "mixing callback/non callback requests not possible");
8347  return BM_INVALID_MIXING;
8348  }
8349 
8350  /* do not allow GET_RECENT with nonzero cache size */
8351  if (sampling_type == GET_RECENT && pbuf->read_cache_size > 0) {
8352  pbuf_guard.unlock(); // unlock before cm_msg()
8353  cm_msg(MERROR, "bm_add_event_request", "GET_RECENT request not possible if read cache is enabled");
8354  return BM_INVALID_PARAM;
8355  }
8356 
8357  /* get a pointer to the proper client structure */
8358  BUFFER_HEADER *pheader = pbuf->buffer_header;
8359  BUFFER_CLIENT *pclient = bm_get_my_client(pbuf, pheader);
8360 
8361  /* look for a empty request entry */
8362  int i;
8363  for (i = 0; i < MAX_EVENT_REQUESTS; i++)
8364  if (!pclient->event_request[i].valid)
8365  break;
8366 
8367  if (i == MAX_EVENT_REQUESTS) {
8368  // implicit unlock
8369  return BM_NO_MEMORY;
8370  }
8371 
8372  /* setup event_request structure */
8373  pclient->event_request[i].id = request_id;
8374  pclient->event_request[i].valid = TRUE;
8375  pclient->event_request[i].event_id = event_id;
8377  pclient->event_request[i].sampling_type = sampling_type;
8378 
8379  pclient->all_flag = pclient->all_flag || (sampling_type & GET_ALL);
8380 
8381  pbuf->get_all_flag = pclient->all_flag;
8382 
8383  /* set callback flag in buffer structure */
8384  if (func != NULL)
8385  pbuf->callback = TRUE;
8386 
8387  /*
8388  Save the index of the last request in the list so that later only the
8389  requests 0..max_request_index-1 have to be searched through.
8390  */
8391 
8392  if (i + 1 > pclient->max_request_index)
8393  pclient->max_request_index = i + 1;
8394  }
8395 #endif /* LOCAL_ROUTINES */
8396 
8397  return BM_SUCCESS;
8398 }
static BUFFER * bm_get_buffer(const char *who, INT buffer_handle, int *pstatus)
Definition: midas.cxx:6586
static BUFFER_CLIENT * bm_get_my_client(BUFFER *pbuf, BUFFER_HEADER *pheader)
Definition: midas.cxx:5962
#define BM_INVALID_PARAM
Definition: midas.h:625
#define BM_NO_MEMORY
Definition: midas.h:613
#define BM_INVALID_MIXING
Definition: midas.h:627
#define BM_SUCCESS
Definition: midas.h:611
#define GET_ALL
Definition: midas.h:328
#define GET_RECENT
Definition: midas.h:330
#define MERROR
Definition: midas.h:565
INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format,...)
Definition: midas.cxx:917
#define RPC_BM_ADD_EVENT_REQUEST
Definition: mrpc.h:43
bool rpc_is_remote(void)
Definition: midas.cxx:12728
INT rpc_call(DWORD routine_id,...)
Definition: midas.cxx:13630
INT i
Definition: mdump.cxx:35
int INT
Definition: midas.h:129
#define TRUE
Definition: midas.h:182
#define MAX_EVENT_REQUESTS
Definition: midas.h:282
#define POINTER_T
Definition: midas.h:166
#define trigger_mask
Definition: midas_macro.h:233
#define event_id
Definition: midas_macro.h:234
DWORD status
Definition: odbhist.cxx:39
BOOL all_flag
Definition: midas.h:955
EVENT_REQUEST event_request[MAX_EVENT_REQUESTS]
Definition: midas.h:959
INT max_request_index
Definition: midas.h:947
Definition: midas.h:992
BOOL get_all_flag
Definition: midas.h:1014
BOOL callback
Definition: midas.h:1012
BUFFER_HEADER * buffer_header
Definition: midas.h:998
std::atomic< size_t > read_cache_size
Definition: midas.h:1000
short int event_id
Definition: midas.h:935
short int trigger_mask
Definition: midas.h:936
INT sampling_type
Definition: midas.h:937
BOOL valid
Definition: midas.h:934
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_buffers()

INT bm_check_buffers ( void  )

Check if any requested event is waiting in a buffer

Returns
TRUE More events are waiting
FALSE No more events are waiting

Definition at line 10921 of file midas.cxx.

10921  {
10922 #ifdef LOCAL_ROUTINES
10923  {
10924  INT status = 0;
10925  BOOL bMore;
10926  DWORD start_time;
10927  //static DWORD last_time = 0;
10928 
10929  /* if running as a server, buffer checking is done by client
10930  via ASYNC bm_receive_event */
10931  if (rpc_is_mserver()) {
10932  return FALSE;
10933  }
10934 
10935  bMore = FALSE;
10936  start_time = ss_millitime();
10937 
10938  std::vector<BUFFER*> mybuffers;
10939 
10940  gBuffersMutex.lock();
10941  mybuffers = gBuffers;
10942  gBuffersMutex.unlock();
10943 
10944  /* go through all buffers */
10945  for (size_t idx = 0; idx < mybuffers.size(); idx++) {
10946  BUFFER* pbuf = mybuffers[idx];
10947 
10948  if (!pbuf || !pbuf->attached)
10949  continue;
10950 
10951  //int count_loops = 0;
10952  while (1) {
10953  if (pbuf->attached) {
10954  /* one bm_push_event could cause a run stop and a buffer close, which
10955  * would crash the next call to bm_push_event(). So check for valid
10956  * buffer on each call */
10957 
10958  /* this is what happens:
10959  * bm_push_buffer() may call a user callback function
10960  * user callback function may indirectly call bm_close() of this buffer,
10961  * i.e. if it stops the run,
10962  * bm_close() will set pbuf->attached to false, but will not delete pbuf or touch gBuffers
10963  * here we will see pbuf->attched is false and quit this loop
10964  */
10965 
10966  status = bm_push_buffer(pbuf, idx + 1);
10967 
10968  if (status == BM_CORRUPTED) {
10969  return status;
10970  }
10971 
10972  //printf("bm_check_buffers: bm_push_buffer() returned %d, loop %d, time %d\n", status, count_loops, ss_millitime() - start_time);
10973 
10974  if (status != BM_MORE_EVENTS) {
10975  //DWORD t = ss_millitime() - start_time;
10976  //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, no more events\n", idx, start_time - last_time, t, count_loops);
10977  break;
10978  }
10979 
10980  // count_loops++;
10981  }
10982 
10983  // NB: this code has a logic error: if 2 buffers always have data,
10984  // this timeout will cause us to exit reading the 1st buffer
10985  // after 1000 msec, then we read the 2nd buffer exactly once,
10986  // and exit the loop because the timeout is still active -
10987  // we did not reset "start_time" when we started reading
10988  // from the 2nd buffer. Result is that we always read all
10989  // the data in a loop from the 1st buffer, but read just
10990  // one event from the 2nd buffer, resulting in severe unfairness.
10991 
10992  /* stop after one second */
10993  DWORD t = ss_millitime() - start_time;
10994  if (t > 1000) {
10995  //printf("bm_check_buffers: index %d, period %d, elapsed %d, loop %d, timeout.\n", idx, start_time - last_time, t, count_loops);
10996  bMore = TRUE;
10997  break;
10998  }
10999  }
11000  }
11001 
11002  //last_time = start_time;
11003 
11004  return bMore;
11005 
11006  }
11007 #else /* LOCAL_ROUTINES */
11008 
11009  return FALSE;
11010 
11011 #endif
11012 }
#define FALSE
Definition: cfortran.h:309
static INT bm_push_buffer(BUFFER *pbuf, int buffer_handle)
Definition: midas.cxx:10869
#define BM_MORE_EVENTS
Definition: midas.h:626
#define BM_CORRUPTED
Definition: midas.h:629
unsigned int DWORD
Definition: mcstd.h:51
DWORD ss_millitime()
Definition: system.cxx:3332
bool rpc_is_mserver(void)
Definition: midas.cxx:12785
static std::mutex gBuffersMutex
Definition: midas.cxx:197
static std::vector< BUFFER * > gBuffers
Definition: midas.cxx:198
DWORD BOOL
Definition: midas.h:105
std::atomic_bool attached
Definition: midas.h:993
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_check_requests()

static BOOL bm_check_requests ( const BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 8933 of file midas.cxx.

8933  {
8934 
8935  BOOL is_requested = FALSE;
8936  int i;
8937  for (i = 0; i < pc->max_request_index; i++) {
8938  const EVENT_REQUEST *prequest = pc->event_request + i;
8939  if (prequest->valid) {
8940  if (bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
8941  /* check if this is a recent event */
8942  if (prequest->sampling_type == GET_RECENT) {
8943  if (ss_time() - pevent->time_stamp > 1) {
8944  /* skip that event */
8945  continue;
8946  }
8947  }
8948 
8949  is_requested = TRUE;
8950  break;
8951  }
8952  }
8953  }
8954  return is_requested;
8955 }
INT bm_match_event(short int event_id, short int trigger_mask, const EVENT_HEADER *pevent)
Definition: midas.cxx:5978
DWORD ss_time()
Definition: system.cxx:3401
DWORD time_stamp
Definition: midas.h:861
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_cleanup_buffer_locked()

static void bm_cleanup_buffer_locked ( BUFFER pbuf,
const char *  who,
DWORD  actual_time 
)
static

Check all clients on buffer, remove invalid clients

Definition at line 6029 of file midas.cxx.

6029  {
6030  BUFFER_HEADER *pheader;
6031  BUFFER_CLIENT *pbclient;
6032  int j;
6033 
6034  pheader = pbuf->buffer_header;
6035  pbclient = pheader->client;
6036 
6037  /* now check other clients */
6038  for (j = 0; j < pheader->max_client_index; j++, pbclient++) {
6039  if (pbclient->pid) {
6040  if (!ss_pid_exists(pbclient->pid)) {
6041  cm_msg(MINFO, "bm_cleanup",
6042  "Client \'%s\' on buffer \'%s\' removed by %s because process pid %d does not exist", pbclient->name,
6043  pheader->name, who, pbclient->pid);
6044 
6045  bm_remove_client_locked(pheader, j);
6046  continue;
6047  }
6048  }
6049 
6050  /* If client process has no activity, clear its buffer entry. */
6051  if (pbclient->pid && pbclient->watchdog_timeout > 0) {
6052  DWORD tdiff = actual_time - pbclient->last_activity;
6053 #if 0
6054  printf("buffer [%s] client [%-32s] times 0x%08x 0x%08x, diff 0x%08x %5d, timeout %d\n",
6055  pheader->name,
6056  pbclient->name,
6057  pbclient->last_activity,
6058  actual_time,
6059  tdiff,
6060  tdiff,
6061  pbclient->watchdog_timeout);
6062 #endif
6063  if (actual_time > pbclient->last_activity &&
6064  tdiff > pbclient->watchdog_timeout) {
6065 
6066  cm_msg(MINFO, "bm_cleanup", "Client \'%s\' on buffer \'%s\' removed by %s (idle %1.1lfs, timeout %1.0lfs)",
6067  pbclient->name, pheader->name, who,
6068  tdiff / 1000.0,
6069  pbclient->watchdog_timeout / 1000.0);
6070 
6071  bm_remove_client_locked(pheader, j);
6072  }
6073  }
6074  }
6075 }
void bm_remove_client_locked(BUFFER_HEADER *pheader, int j)
Definition: midas.cxx:5998
#define MINFO
Definition: midas.h:566
BOOL ss_pid_exists(int pid)
Definition: system.cxx:1446
DWORD actual_time
Definition: mfe.cxx:38
INT j
Definition: odbhist.cxx:40
DWORD watchdog_timeout
Definition: midas.h:957
DWORD last_activity
Definition: midas.h:956
char name[NAME_LENGTH]
Definition: midas.h:941
char name[NAME_LENGTH]
Definition: midas.h:964
INT max_client_index
Definition: midas.h:966
BUFFER_CLIENT client[MAX_CLIENTS]
Definition: midas.h:973
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_clear_buffer_statistics()

static void bm_clear_buffer_statistics ( HNDLE  hDB,
BUFFER pbuf 
)
static

Definition at line 6372 of file midas.cxx.

6372  {
6373  HNDLE hKey;
6374  int status;
6375 
6376  char str[256 + 2 * NAME_LENGTH];
6377  sprintf(str, "/System/buffers/%s/Clients/%s/writes_blocked_by", pbuf->buffer_name, pbuf->client_name);
6378  //printf("delete [%s]\n", str);
6379  status = db_find_key(hDB, 0, str, &hKey);
6380  if (status == DB_SUCCESS) {
6382  }
6383 }
#define DB_SUCCESS
Definition: midas.h:637
INT db_delete_key(HNDLE hDB, HNDLE hKey, BOOL follow_links)
Definition: odb.cxx:3846
INT db_find_key(HNDLE hDB, HNDLE hKey, const char *key_name, HNDLE *subhKey)
Definition: odb.cxx:4069
HNDLE hKey
Definition: lazylogger.cxx:207
HNDLE hDB
main ODB handle
Definition: mana.cxx:207
INT HNDLE
Definition: midas.h:132
#define NAME_LENGTH
Definition: midas.h:279
char str[256]
Definition: odbhist.cxx:33
char client_name[NAME_LENGTH]
Definition: midas.h:996
char buffer_name[NAME_LENGTH]
Definition: midas.h:997
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_all_buffers()

INT bm_close_all_buffers ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7207 of file midas.cxx.

7207  {
7208  if (rpc_is_remote())
7210 
7211 #ifdef LOCAL_ROUTINES
7212  {
7214 
7215  gBuffersMutex.lock();
7216  size_t nbuf = gBuffers.size();
7217  gBuffersMutex.unlock();
7218 
7219  for (size_t i = nbuf; i > 0; i--) {
7220  bm_close_buffer(i);
7221  }
7222 
7223  gBuffersMutex.lock();
7224  for (size_t i=0; i< gBuffers.size(); i++) {
7225  BUFFER* pbuf = gBuffers[i];
7226  if (!pbuf)
7227  continue;
7228  delete pbuf;
7229  pbuf = NULL;
7230  gBuffers[i] = NULL;
7231  }
7232  gBuffersMutex.unlock();
7233  }
7234 #endif /* LOCAL_ROUTINES */
7235 
7236  return BM_SUCCESS;
7237 }
INT bm_close_buffer(INT buffer_handle)
Definition: midas.cxx:7060
int cm_msg_close_buffer(void)
Definition: midas.cxx:489
#define RPC_BM_CLOSE_ALL_BUFFERS
Definition: mrpc.h:38
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_close_buffer()

INT bm_close_buffer ( INT  buffer_handle)

Closes an event buffer previously opened with bm_open_buffer().

Parameters
buffer_handlebuffer handle
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 7060 of file midas.cxx.

7060  {
7061  //printf("bm_close_buffer: handle %d\n", buffer_handle);
7062 
7063  if (rpc_is_remote())
7064  return rpc_call(RPC_BM_CLOSE_BUFFER, buffer_handle);
7065 
7066 #ifdef LOCAL_ROUTINES
7067  {
7068  int status = 0;
7069 
7070  BUFFER *pbuf = bm_get_buffer(NULL, buffer_handle, &status);
7071 
7072  if (!pbuf)
7073  return status;
7074 
7075  //printf("bm_close_buffer: handle %d, name [%s]\n", buffer_handle, pheader->name);
7076 
7077  int i;
7078 
7079  { /* delete all requests for this buffer */
7080  _request_list_mutex.lock();
7081  std::vector<EventRequest> request_list_copy = _request_list;
7082  _request_list_mutex.unlock();
7083  for (size_t i = 0; i < request_list_copy.size(); i++) {
7084  if (request_list_copy[i].buffer_handle == buffer_handle) {
7086  }
7087  }
7088  }
7089 
7090  HNDLE hDB;
7092 
7093  if (hDB) {
7094  /* write statistics to odb */
7096  }
7097 
7098  /* lock buffer in correct order */
7099 
7101 
7102  if (status != BM_SUCCESS) {
7103  return status;
7104  }
7105 
7107 
7108  if (status != BM_SUCCESS) {
7109  pbuf->read_cache_mutex.unlock();
7110  return status;
7111  }
7112 
7113  bm_lock_buffer_guard pbuf_guard(pbuf);
7114 
7115  if (!pbuf_guard.is_locked()) {
7116  pbuf->write_cache_mutex.unlock();
7117  pbuf->read_cache_mutex.unlock();
7118  return pbuf_guard.get_status();
7119  }
7120 
7121  BUFFER_HEADER *pheader = pbuf->buffer_header;
7122 
7123  /* mark entry in _buffer as empty */
7124  pbuf->attached = false;
7125 
7126  BUFFER_CLIENT* pclient = bm_get_my_client(pbuf, pheader);
7127 
7128  if (pclient) {
7129  /* clear entry from client structure in buffer header */
7130  memset(pclient, 0, sizeof(BUFFER_CLIENT));
7131  }
7132 
7133  /* calculate new max_client_index entry */
7134  for (i = MAX_CLIENTS - 1; i >= 0; i--)
7135  if (pheader->client[i].pid != 0)
7136  break;
7137  pheader->max_client_index = i + 1;
7138 
7139  /* count new number of clients */
7140  int j = 0;
7141  for (i = MAX_CLIENTS - 1; i >= 0; i--)
7142  if (pheader->client[i].pid != 0)
7143  j++;
7144  pheader->num_clients = j;
7145 
7146  int destroy_flag = (pheader->num_clients == 0);
7147 
7148  // we hold the locks on the read cache and the write cache.
7149 
7150  /* free cache */
7151  if (pbuf->read_cache_size > 0) {
7152  free(pbuf->read_cache);
7153  pbuf->read_cache = NULL;
7154  pbuf->read_cache_size = 0;
7155  pbuf->read_cache_rp = 0;
7156  pbuf->read_cache_wp = 0;
7157  }
7158 
7159  if (pbuf->write_cache_size > 0) {
7160  free(pbuf->write_cache);
7161  pbuf->write_cache = NULL;
7162  pbuf->write_cache_size = 0;
7163  pbuf->write_cache_rp = 0;
7164  pbuf->write_cache_wp = 0;
7165  }
7166 
7167  /* check if anyone is waiting and wake him up */
7168 
7169  for (int i = 0; i < pheader->max_client_index; i++) {
7170  BUFFER_CLIENT *pclient = pheader->client + i;
7171  if (pclient->pid && (pclient->write_wait || pclient->read_wait))
7172  ss_resume(pclient->port, "B ");
7173  }
7174 
7175  /* unmap shared memory, delete it if we are the last */
7176 
7177  ss_shm_close(pbuf->buffer_name, pbuf->buffer_header, pbuf->shm_size, pbuf->shm_handle, destroy_flag);
7178 
7179  /* after ss_shm_close() these are invalid: */
7180 
7181  pheader = NULL;
7182  pbuf->buffer_header = NULL;
7183  pbuf->shm_size = 0;
7184  pbuf->shm_handle = 0;
7185 
7186  /* unlock buffer in correct order */
7187 
7188  pbuf_guard.unlock();
7189 
7190  pbuf->write_cache_mutex.unlock();
7191  pbuf->read_cache_mutex.unlock();
7192 
7193  /* delete semaphore */
7194 
7195  ss_semaphore_delete(pbuf->semaphore, destroy_flag);
7196  }
7197 #endif /* LOCAL_ROUTINES */
7198 
7199  return BM_SUCCESS;
7200 }
INT bm_delete_request(INT request_id)
Definition: midas.cxx:8551
static void bm_write_buffer_statistics_to_odb(HNDLE hDB, BUFFER *pbuf, BOOL force)
Definition: midas.cxx:6550
INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient)
Definition: midas.cxx:3005
static int bm_lock_buffer_read_cache(BUFFER *pbuf)
Definition: midas.cxx:7869
static int bm_lock_buffer_write_cache(BUFFER *pbuf)
Definition: midas.cxx:7890
INT ss_resume(INT port, const char *message)
Definition: system.cxx:4783
INT ss_semaphore_delete(HNDLE semaphore_handle, INT destroy_flag)
Definition: system.cxx:2808
INT ss_shm_close(const char *name, void *adr, size_t shm_size, HNDLE handle, INT destroy_flag)
Definition: system.cxx:764
#define RPC_BM_CLOSE_BUFFER
Definition: mrpc.h:37
static std::vector< EventRequest > _request_list
Definition: midas.cxx:222
static std::mutex _request_list_mutex
Definition: midas.cxx:221
#define MAX_CLIENTS
Definition: midas.h:281
INT write_wait
Definition: midas.h:953
BOOL read_wait
Definition: midas.h:952
INT port
Definition: midas.h:945
INT num_clients
Definition: midas.h:965
HNDLE semaphore
Definition: midas.h:1009
size_t read_cache_rp
Definition: midas.h:1002
std::timed_mutex read_cache_mutex
Definition: midas.h:999
std::timed_mutex write_cache_mutex
Definition: midas.h:1004
size_t shm_size
Definition: midas.h:1011
char * read_cache
Definition: midas.h:1001
size_t write_cache_rp
Definition: midas.h:1007
size_t write_cache_wp
Definition: midas.h:1008
char * write_cache
Definition: midas.h:1006
size_t read_cache_wp
Definition: midas.h:1003
std::atomic< size_t > write_cache_size
Definition: midas.h:1005
INT shm_handle
Definition: midas.h:1010
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event()

INT bm_compose_event ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD  serial 
)

Compose a Midas event header. An event header can usually be set-up manually or through this routine. If the data size of the event is not known when the header is composed, it can be set later with event_header->data-size = <...> Following structure is created at the beginning of an event

typedef struct {
short int event_id;
short int trigger_mask;
DWORD data_size;
char event[1000];
bm_compose_event((EVENT_HEADER *)event, 1, 0, 100, 1);
*(event+sizeof(EVENT_HEADER)) = <...>
INT bm_compose_event(EVENT_HEADER *event_header, short int event_id, short int trigger_mask, DWORD data_size, DWORD serial)
Definition: midas.cxx:8246
#define serial_number
Definition: midas_macro.h:235
#define time_stamp
Definition: midas_macro.h:236
Parameters
event_headerpointer to the event header
event_idevent ID of the event
trigger_masktrigger mask of the event
data_sizesize if the data part of the event in bytes
serialserial number
Returns
BM_SUCCESS

Definition at line 8246 of file midas.cxx.

8247 {
8248  event_header->event_id = event_id;
8249  event_header->trigger_mask = trigger_mask;
8250  event_header->data_size = data_size;
8251  event_header->time_stamp = ss_time();
8252  event_header->serial_number = serial;
8253 
8254  return BM_SUCCESS;
8255 }
INT serial
Definition: minife.c:20
short int event_id
Definition: midas.h:858
DWORD data_size
Definition: midas.h:862
DWORD serial_number
Definition: midas.h:860
short int trigger_mask
Definition: midas.h:859
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_compose_event_threadsafe()

INT bm_compose_event_threadsafe ( EVENT_HEADER event_header,
short int  event_id,
short int  trigger_mask,
DWORD  data_size,
DWORD serial 
)

Definition at line 8257 of file midas.cxx.

8258 {
8259  static std::mutex mutex;
8260 
8261  event_header->event_id = event_id;
8262  event_header->trigger_mask = trigger_mask;
8263  event_header->data_size = data_size;
8264  event_header->time_stamp = ss_time();
8265  {
8266  std::lock_guard<std::mutex> lock(mutex);
8267  event_header->serial_number = *serial;
8268  *serial = *serial + 1;
8269  // implicit unlock
8270  }
8271 
8272  return BM_SUCCESS;
8273 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_convert_event_header()

static void bm_convert_event_header ( EVENT_HEADER pevent,
int  convert_flags 
)
static

Definition at line 9036 of file midas.cxx.

9036  {
9037  /* now convert event header */
9038  if (convert_flags) {
9039  rpc_convert_single(&pevent->event_id, TID_INT16, RPC_OUTGOING, convert_flags);
9040  rpc_convert_single(&pevent->trigger_mask, TID_INT16, RPC_OUTGOING, convert_flags);
9041  rpc_convert_single(&pevent->serial_number, TID_UINT32, RPC_OUTGOING, convert_flags);
9042  rpc_convert_single(&pevent->time_stamp, TID_UINT32, RPC_OUTGOING, convert_flags);
9043  rpc_convert_single(&pevent->data_size, TID_UINT32, RPC_OUTGOING, convert_flags);
9044  }
9045 }
#define TID_UINT32
Definition: midas.h:344
#define TID_INT16
Definition: midas.h:342
void rpc_convert_single(void *data, INT tid, INT flags, INT convert_flags)
Definition: midas.cxx:11648
#define RPC_OUTGOING
Definition: midas.h:1583
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_delete_request()

INT bm_delete_request ( INT  request_id)

Deletes an event request previously done with bm_request_event(). When an event request gets deleted, events of that requested type are not received any more. When a buffer is closed via bm_close_buffer(), all event requests from that buffer are deleted automatically

Parameters
request_idrequest identifier given by bm_request_event()
Returns
BM_SUCCESS, BM_INVALID_HANDLE

Definition at line 8551 of file midas.cxx.

8552 {
8553  _request_list_mutex.lock();
8554 
8555  if (request_id < 0 || size_t(request_id) >= _request_list.size()) {
8556  _request_list_mutex.unlock();
8557  return BM_INVALID_HANDLE;
8558  }
8559 
8560  int buffer_handle = _request_list[request_id].buffer_handle;
8561 
8562  _request_list[request_id].clear();
8563 
8564  _request_list_mutex.unlock();
8565 
8566  /* remove request entry from buffer */
8567  return bm_remove_event_request(buffer_handle, request_id);
8568 }
INT bm_remove_event_request(INT buffer_handle, INT request_id)
Definition: midas.cxx:8484
#define BM_INVALID_HANDLE
Definition: midas.h:615
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_dispatch_event()

static void bm_dispatch_event ( int  buffer_handle,
EVENT_HEADER pevent 
)
static

Definition at line 8790 of file midas.cxx.

8791 {
8792  _request_list_mutex.lock();
8793  bool locked = true;
8794  size_t n = _request_list.size();
8795  /* call dispatcher */
8796  for (size_t i = 0; i < n; i++) {
8797  if (!locked) {
8798  _request_list_mutex.lock();
8799  locked = true;
8800  }
8802  if (r.buffer_handle != buffer_handle)
8803  continue;
8804  if (!bm_match_event(r.event_id, r.trigger_mask, pevent))
8805  continue;
8806  /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
8807  _request_list_mutex.unlock();
8808  locked = false;
8809  /* if event is fragmented, call defragmenter */
8810  if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG))) {
8811  bm_defragment_event(buffer_handle, i, pevent, (void *) (pevent + 1), r.dispatcher);
8812  } else {
8813  r.dispatcher(buffer_handle, i, pevent, (void *) (pevent + 1));
8814  }
8815  }
8816  if (locked)
8817  _request_list_mutex.unlock();
8818 }
static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher)
Definition: midas.cxx:11251
DWORD n[4]
Definition: mana.cxx:247
#define EVENTID_FRAG
Definition: midas.h:913
#define EVENTID_FRAG1
Definition: midas.h:912
short int event_id
Definition: midas.cxx:208
INT buffer_handle
Definition: midas.cxx:207
short int trigger_mask
Definition: midas.cxx:209
EVENT_HANDLER * dispatcher
Definition: midas.cxx:210
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_empty_buffers()

INT bm_empty_buffers ( void  )

Clears event buffer and cache. If an event buffer is large and a consumer is slow in analyzing events, events are usually received some time after they are produced. This effect is even more experienced if a read cache is used (via bm_set_cache_size()). When changes to the hardware are made in the experience, the consumer will then still analyze old events before any new event which reflects the hardware change. Users can be fooled by looking at histograms which reflect the hardware change many seconds after they have been made.

To overcome this potential problem, the analyzer can call bm_empty_buffers() just after the hardware change has been made which skips all old events contained in event buffers and read caches. Technically this is done by forwarding the read pointer of the client. No events are really deleted, they are still visible to other clients like the logger.

Note that the front-end also contains write buffers which can delay the delivery of events. The standard front-end framework mfe.c reduces this effect by flushing all buffers once every second.

Returns
BM_SUCCESS

Definition at line 11207 of file midas.cxx.

11207  {
11208  if (rpc_is_remote())
11210 
11211 #ifdef LOCAL_ROUTINES
11212  {
11213  std::vector<BUFFER*> mybuffers;
11214 
11215  gBuffersMutex.lock();
11216  mybuffers = gBuffers;
11217  gBuffersMutex.unlock();
11218 
11219  /* go through all buffers */
11220  for (BUFFER* pbuf : mybuffers) {
11221  if (!pbuf)
11222  continue;
11223  if (!pbuf->attached)
11224  continue;
11225 
11226  int status = bm_skip_event(pbuf);
11227  if (status != BM_SUCCESS)
11228  return status;
11229  }
11230  }
11231 #endif /* LOCAL_ROUTINES */
11232 
11233  return BM_SUCCESS;
11234 }
static int bm_skip_event(BUFFER *pbuf)
Definition: midas.cxx:10800
#define RPC_BM_EMPTY_BUFFERS
Definition: mrpc.h:49
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_fill_read_cache_locked()

static int bm_fill_read_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Definition at line 8959 of file midas.cxx.

8960 {
8961  BUFFER* pbuf = pbuf_guard.get_pbuf();
8962  BUFFER_HEADER* pheader = pbuf->buffer_header;
8963  BUFFER_CLIENT *pc = bm_get_my_client(pbuf, pheader);
8964  BOOL need_wakeup = FALSE;
8965 
8966  //printf("bm_fill_read_cache: [%s] timeout %d, size %d, rp %d, wp %d\n", pheader->name, timeout_msec, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
8967 
8968  /* loop over all events in the buffer */
8969 
8970  while (1) {
8971  EVENT_HEADER *pevent = NULL;
8972  int event_size = 3; // poison value
8973  int total_size = 3; // poison value
8974 
8975  int status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
8976  if (status == BM_CORRUPTED) {
8977  return status;
8978  } else if (status != BM_SUCCESS) {
8979  /* event buffer is empty */
8980  if (timeout_msec == BM_NO_WAIT) {
8981  if (need_wakeup)
8982  bm_wakeup_producers_locked(pheader, pc);
8983  if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8984  // read cache is empty
8985  return BM_ASYNC_RETURN;
8986  }
8987  return BM_SUCCESS;
8988  }
8989 
8990  int status = bm_wait_for_more_events_locked(pbuf_guard, pc, timeout_msec, TRUE);
8991 
8992  if (status != BM_SUCCESS) {
8993  // we only come here with SS_ABORT & co
8994  return status;
8995  }
8996 
8997  // make sure we wait for new event only once
8998  timeout_msec = BM_NO_WAIT;
8999  // go back to bm_peek_buffer_locked
9000  continue;
9001  }
9002 
9003  /* loop over all requests: if this event matches a request,
9004  * copy it to the read cache */
9005 
9006  BOOL is_requested = bm_check_requests(pc, pevent);
9007 
9008  if (is_requested) {
9009  if (pbuf->read_cache_wp + total_size > pbuf->read_cache_size) {
9010  /* read cache is full */
9011  if (need_wakeup)
9012  bm_wakeup_producers_locked(pheader, pc);
9013  return BM_SUCCESS;
9014  }
9015 
9017 
9018  pbuf->read_cache_wp += total_size;
9019 
9020  /* update statistics */
9021  pheader->num_out_events++;
9022  pbuf->count_read++;
9023  pbuf->bytes_read += event_size;
9024  }
9025 
9026  /* shift read pointer */
9027 
9028  int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9029  pc->read_pointer = new_read_pointer;
9030 
9031  need_wakeup = TRUE;
9032  }
9033  /* NOT REACHED */
9034 }
BUFFER * get_pbuf() const
Definition: midas.cxx:3179
static void bm_wakeup_producers_locked(const BUFFER_HEADER *pheader, const BUFFER_CLIENT *pc)
Definition: midas.cxx:8754
static int bm_incr_rp_no_check(const BUFFER_HEADER *pheader, int rp, int total_size)
Definition: midas.cxx:6187
static BOOL bm_check_requests(const BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition: midas.cxx:8933
static void bm_read_from_buffer_locked(const BUFFER_HEADER *pheader, int rp, char *buf, int event_size)
Definition: midas.cxx:8903
static int bm_peek_buffer_locked(BUFFER *pbuf, BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition: midas.cxx:8858
static int bm_wait_for_more_events_locked(bm_lock_buffer_guard &pbuf_guard, BUFFER_CLIENT *pc, int timeout_msec, BOOL unlock_read_cache)
Definition: midas.cxx:9359
#define BM_ASYNC_RETURN
Definition: midas.h:619
#define BM_NO_WAIT
Definition: midas.h:373
int event_size
Definition: msysmon.cxx:526
INT read_pointer
Definition: midas.h:946
INT num_out_events
Definition: midas.h:971
double bytes_read
Definition: midas.h:1027
int count_read
Definition: midas.h:1026
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_find_first_request_locked()

static int bm_find_first_request_locked ( BUFFER_CLIENT pc,
const EVENT_HEADER pevent 
)
static

Definition at line 9556 of file midas.cxx.

9556  {
9557  if (pc->pid) {
9558  int j;
9559  for (j = 0; j < pc->max_request_index; j++) {
9560  const EVENT_REQUEST *prequest = pc->event_request + j;
9561  if (prequest->valid && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9562  return prequest->id;
9563  }
9564  }
9565  }
9566 
9567  return -1;
9568 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache()

INT bm_flush_cache ( int  buffer_handle,
int  timeout_msec 
)

Definition at line 10174 of file midas.cxx.

10175 {
10176  if (rpc_is_remote()) {
10177  return bm_flush_cache_rpc(buffer_handle, timeout_msec);
10178  }
10179 
10180 #ifdef LOCAL_ROUTINES
10181  {
10182  INT status = 0;
10183 
10184  //printf("bm_flush_cache!\n");
10185 
10186  BUFFER *pbuf = bm_get_buffer("bm_flush_cache", buffer_handle, &status);
10187 
10188  if (!pbuf)
10189  return status;
10190 
10191  if (pbuf->write_cache_size == 0)
10192  return BM_SUCCESS;
10193 
10195 
10196  if (status != BM_SUCCESS)
10197  return status;
10198 
10199  /* check if anything needs to be flushed */
10200  if (pbuf->write_cache_wp == 0) {
10201  pbuf->write_cache_mutex.unlock();
10202  return BM_SUCCESS;
10203  }
10204 
10205  /* lock the buffer */
10206  bm_lock_buffer_guard pbuf_guard(pbuf);
10207 
10208  if (!pbuf_guard.is_locked())
10209  return pbuf_guard.get_status();
10210 
10211  status = bm_flush_cache_locked(pbuf_guard, timeout_msec);
10212 
10213  /* unlock in correct order */
10214 
10215  if (pbuf_guard.is_locked()) {
10216  // check if bm_wait_for_free_space() failed to relock the buffer
10217  pbuf_guard.unlock();
10218  }
10219 
10220  pbuf->write_cache_mutex.unlock();
10221 
10222  return status;
10223  }
10224 #endif /* LOCAL_ROUTINES */
10225 
10226  return BM_SUCCESS;
10227 }
static int bm_flush_cache_rpc(int buffer_handle, int timeout_msec)
Definition: midas.cxx:9954
static INT bm_flush_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition: midas.cxx:10030
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_locked()

static INT bm_flush_cache_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec 
)
static

Empty write cache. This function should be used if events in the write cache should be visible to the consumers immediately. It should be called at the end of each run, otherwise events could be kept in the write buffer and will flow to the data of the next run.

Parameters
buffer_handleBuffer handle obtained via bm_open_buffer() or 0 to flush data in the mserver event socket
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the cache.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN Routine called with async_flag == BM_NO_WAIT and buffer has not enough space to receive cache
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 10030 of file midas.cxx.

10031 {
10032  // NB we come here with write cache locked and buffer locked.
10033 
10034  {
10035  INT status = 0;
10036 
10037  //printf("bm_flush_cache_locked!\n");
10038 
10039  BUFFER* pbuf = pbuf_guard.get_pbuf();
10040  BUFFER_HEADER* pheader = pbuf->buffer_header;
10041 
10042  //printf("bm_flush_cache_locked: buffer %s, cache rp %zu, wp %zu, timeout %d msec\n", pbuf->buffer_name, pbuf->write_cache_rp, pbuf->write_cache_wp, timeout_msec);
10043 
10044  int old_write_pointer = pheader->write_pointer;
10045 
10046  int request_id[MAX_CLIENTS];
10047  for (int i = 0; i < pheader->max_client_index; i++) {
10048  request_id[i] = -1;
10049  }
10050 
10051  size_t ask_rp = pbuf->write_cache_rp;
10052  size_t ask_wp = pbuf->write_cache_wp;
10053 
10054  if (ask_wp == 0) { // nothing to do
10055  return BM_SUCCESS;
10056  }
10057 
10058  if (ask_rp == ask_wp) { // nothing to do
10059  return BM_SUCCESS;
10060  }
10061 
10062  assert(ask_rp < ask_wp);
10063 
10064  size_t ask_free = ALIGN8(ask_wp - ask_rp);
10065 
10066  if (ask_free == 0) { // nothing to do
10067  return BM_SUCCESS;
10068  }
10069 
10070 #if 0
10072  if (status != BM_SUCCESS) {
10073  printf("bm_flush_cache: corrupted 111!\n");
10074  abort();
10075  }
10076 #endif
10077 
10078  status = bm_wait_for_free_space_locked(pbuf_guard, timeout_msec, ask_free, true);
10079 
10080  if (status != BM_SUCCESS) {
10081  return status;
10082  }
10083 
10084  // NB: ask_rp, ask_wp and ask_free are invalid after calling bm_wait_for_free_space():
10085  //
10086  // wait_for_free_space() will sleep with all locks released,
10087  // during this time, another thread may call bm_send_event() that will
10088  // add one or more events to the write cache and after wait_for_free_space()
10089  // returns, size of data in cache will be bigger than the amount
10090  // of free space we requested. so we need to keep track of how
10091  // much data we write to the buffer and ask for more data
10092  // if we run short. This is the reason for the big loop
10093  // around wait_for_free_space(). We ask for slightly too little free
10094  // space to make sure all this code is always used and does work. K.O.
10095 
10096  if (pbuf->write_cache_wp == 0) {
10097  /* somebody emptied the cache while we were inside bm_wait_for_free_space */
10098  return BM_SUCCESS;
10099  }
10100 
10101  //size_t written = 0;
10102  while (pbuf->write_cache_rp < pbuf->write_cache_wp) {
10103  /* loop over all events in cache */
10104 
10105  const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pbuf->write_cache + pbuf->write_cache_rp);
10106  size_t event_size = (pevent->data_size + sizeof(EVENT_HEADER));
10107  size_t total_size = ALIGN8(event_size);
10108 
10109 #if 0
10110  printf("bm_flush_cache: cache size %d, wp %d, rp %d, event data_size %d, event_size %d, total_size %d, free %d, written %d\n",
10111  int(pbuf->write_cache_size),
10112  int(pbuf->write_cache_wp),
10113  int(pbuf->write_cache_rp),
10114  int(pevent->data_size),
10115  int(event_size),
10116  int(total_size),
10117  int(ask_free),
10118  int(written));
10119 #endif
10120 
10121  // check for crazy event size
10122  assert(total_size >= sizeof(EVENT_HEADER));
10123  assert(total_size <= (size_t)pheader->size);
10124 
10125  bm_write_to_buffer_locked(pheader, 1, (char**)&pevent, &event_size, total_size);
10126 
10127  /* update statistics */
10128  pheader->num_in_events++;
10129  pbuf->count_sent += 1;
10130  pbuf->bytes_sent += total_size;
10131 
10132  /* see comment for the same code in bm_send_event().
10133  * We make sure the buffer is never 100% full */
10134  assert(pheader->write_pointer != pheader->read_pointer);
10135 
10136  /* check if anybody has a request for this event */
10137  for (int i = 0; i < pheader->max_client_index; i++) {
10138  BUFFER_CLIENT *pc = pheader->client + i;
10139  int r = bm_find_first_request_locked(pc, pevent);
10140  if (r >= 0) {
10141  request_id[i] = r;
10142  }
10143  }
10144 
10145  /* this loop does not loop forever because rp
10146  * is monotonously incremented here. write_cache_wp does
10147  * not change */
10148 
10149  pbuf->write_cache_rp += total_size;
10150  //written += total_size;
10151 
10152  assert(pbuf->write_cache_rp > 0);
10153  assert(pbuf->write_cache_rp <= pbuf->write_cache_size);
10154  assert(pbuf->write_cache_rp <= pbuf->write_cache_wp);
10155  }
10156 
10157  /* the write cache is now empty */
10158  assert(pbuf->write_cache_wp == pbuf->write_cache_rp);
10159  pbuf->write_cache_wp = 0;
10160  pbuf->write_cache_rp = 0;
10161 
10162  /* check which clients are waiting */
10163  for (int i = 0; i < pheader->max_client_index; i++) {
10164  BUFFER_CLIENT *pc = pheader->client + i;
10165  bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id[i]);
10166  }
10167  }
10168 
10169  return BM_SUCCESS;
10170 }
static void bm_notify_reader_locked(BUFFER_HEADER *pheader, BUFFER_CLIENT *pc, int old_write_pointer, int request_id)
Definition: midas.cxx:9570
static int bm_find_first_request_locked(BUFFER_CLIENT *pc, const EVENT_HEADER *pevent)
Definition: midas.cxx:9556
static int bm_validate_buffer_locked(const BUFFER *pbuf)
Definition: midas.cxx:6271
static void bm_write_to_buffer_locked(BUFFER_HEADER *pheader, int sg_n, const char *const sg_ptr[], const size_t sg_len[], size_t total_size)
Definition: midas.cxx:9471
static int bm_wait_for_free_space_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec, int requested_space, bool unlock_write_cache)
Definition: midas.cxx:9047
#define ALIGN8(x)
Definition: midas.h:528
INT num_in_events
Definition: midas.h:970
INT write_pointer
Definition: midas.h:969
INT read_pointer
Definition: midas.h:968
INT size
Definition: midas.h:967
int count_sent
Definition: midas.h:1018
double bytes_sent
Definition: midas.h:1019
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_flush_cache_rpc()

static int bm_flush_cache_rpc ( int  buffer_handle,
int  timeout_msec 
)
static

Definition at line 9954 of file midas.cxx.

9955 {
9956  //printf("bm_flush_cache_rpc: handle %d, timeout %d\n", buffer_handle, timeout_msec);
9957 
9958  DWORD time_start = ss_millitime();
9959  DWORD time_end = time_start + timeout_msec;
9960 
9961  int xtimeout_msec = timeout_msec;
9962 
9963  while (1) {
9964  if (timeout_msec == BM_WAIT) {
9965  xtimeout_msec = 1000;
9966  } else if (timeout_msec == BM_NO_WAIT) {
9967  xtimeout_msec = BM_NO_WAIT;
9968  } else {
9969  if (xtimeout_msec > 1000) {
9970  xtimeout_msec = 1000;
9971  }
9972  }
9973 
9974  int status = rpc_call(RPC_BM_FLUSH_CACHE, buffer_handle, xtimeout_msec);
9975 
9976  //printf("bm_flush_cache_rpc: handle %d, timeout %d, status %d\n", buffer_handle, xtimeout_msec, status);
9977 
9978  if (status == BM_ASYNC_RETURN) {
9979  if (timeout_msec == BM_WAIT) {
9980  // BM_WAIT means wait forever
9981  continue;
9982  } else if (timeout_msec == BM_NO_WAIT) {
9983  // BM_NO_WAIT means do not wait
9984  return status;
9985  } else {
9986  DWORD now = ss_millitime();
9987  if (now >= time_end) {
9988  // timeout, return BM_ASYNC_RETURN
9989  return status;
9990  }
9991 
9992  DWORD remain = time_end - now;
9993 
9994  if (remain < (DWORD)xtimeout_msec) {
9995  xtimeout_msec = remain;
9996  }
9997 
9998  // keep asking for event...
9999  continue;
10000  }
10001  } else if (status == BM_SUCCESS) {
10002  // success, return BM_SUCCESS
10003  return status;
10004  } else {
10005  // error
10006  return status;
10007  }
10008  }
10009 }
#define BM_WAIT
Definition: midas.h:372
#define RPC_BM_FLUSH_CACHE
Definition: mrpc.h:46
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_get_buffer_handle()

INT bm_get_buffer_handle ( const char *  buffer_name,
INT buffer_handle 
)

If buffer is already open, return it's handle

Parameters
buffer_namebuffer name
Returns
BM_SUCCESS, BM_NOT_FOUND

Definition at line 7039 of file midas.cxx.

7040 {
7041  gBuffersMutex.lock();
7042  for (size_t i = 0; i < gBuffers.size(); i++) {
7043  BUFFER* pbuf = gBuffers[i];
7044  if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
7045  *buffer_handle = i + 1;
7046  gBuffersMutex.unlock();
7047  return BM_SUCCESS;
7048  }
7049  }
7050  gBuffersMutex.unlock();
7051  return BM_NOT_FOUND;
7052 }
#define BM_NOT_FOUND
Definition: midas.h:618
BOOL equal_ustring(const char *str1, const char *str2)
Definition: odb.cxx:3191
char buffer_name[NAME_LENGTH]
Definition: mevb.c:45
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_incr_read_cache_locked()

static void bm_incr_read_cache_locked ( BUFFER pbuf,
int  total_size 
)
static

Definition at line 8822 of file midas.cxx.

8822  {
8823  /* increment read cache read pointer */
8824  pbuf->read_cache_rp += total_size;
8825 
8826  if (pbuf->read_cache_rp == pbuf->read_cache_wp) {
8827  pbuf->read_cache_rp = 0;
8828  pbuf->read_cache_wp = 0;
8829  }
8830 }
Here is the caller graph for this function:

◆ bm_incr_rp_no_check()

static int bm_incr_rp_no_check ( const BUFFER_HEADER pheader,
int  rp,
int  total_size 
)
static

Definition at line 6187 of file midas.cxx.

6188 {
6189 #if 0
6190  if (gRpLog == NULL) {
6191  gRpLog = fopen("rp.log", "a");
6192  }
6193  if (gRpLog && (total_size < 16)) {
6194  const char *pdata = (const char *) (pheader + 1);
6195  const DWORD *pevent = (const DWORD*) (pdata + rp);
6196  fprintf(gRpLog, "%s: rp %d, total_size %d, at rp 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x 0x%08x\n", pheader->name, rp, total_size,
6197  pevent[0], pevent[1], pevent[2], pevent[3], pevent[4], pevent[5]);
6198  }
6199 #endif
6200 
6201  // these checks are already done before we come here.
6202  // but we check again as last-ressort protection. K.O.
6203  assert(total_size > 0);
6204  assert(total_size >= (int)sizeof(EVENT_HEADER));
6205 
6206  rp += total_size;
6207  if (rp >= pheader->size) {
6208  rp -= pheader->size;
6209  } else if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6210  // note: ">" here to match bm_write_to_buffer_locked() and bm_validate_rp().
6211  // if at the end of the buffer, the remaining free space is exactly
6212  // equal to the size of an event header, the event header
6213  // is written there, the pointer is wrapped and the event data
6214  // is written to the beginning of the buffer.
6215  rp = 0;
6216  }
6217  return rp;
6218 }
Here is the caller graph for this function:

◆ bm_match_event()

INT bm_match_event ( short int  event_id,
short int  trigger_mask,
const EVENT_HEADER pevent 
)

Check if an event matches a given event request by the event id and trigger mask

Parameters
event_idEvent ID of request
trigger_maskTrigger mask of request
peventPointer to event to check
Returns
TRUE if event matches request

Definition at line 5978 of file midas.cxx.

5978  {
5979  // NB: cast everything to unsigned 16 bit to avoid bitwise comparison failure
5980  // because of mismatch in sign-extension between signed 16-bit event_id and
5981  // unsigned 16-bit constants. K.O.
5982 
5983  if (((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG1)) || ((uint16_t(pevent->event_id) & uint16_t(0xF000)) == uint16_t(EVENTID_FRAG)))
5984  /* fragmented event */
5985  return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == (uint16_t(pevent->event_id) & uint16_t(0x0FFF))))
5986  && ((uint16_t(trigger_mask) == uint16_t(TRIGGER_ALL)) || ((uint16_t(trigger_mask) & uint16_t(pevent->trigger_mask)))));
5987 
5988  return (((uint16_t(event_id) == uint16_t(EVENTID_ALL)) || (uint16_t(event_id) == uint16_t(pevent->event_id)))
5989  && ((uint16_t(trigger_mask) == uint16_t(TRIGGER_ALL)) || ((uint16_t(trigger_mask) & uint16_t(pevent->trigger_mask)))));
5990 }
#define TRIGGER_ALL
Definition: midas.h:544
#define EVENTID_ALL
Definition: midas.h:543
Here is the caller graph for this function:

◆ bm_next_rp()

static int bm_next_rp ( const char *  who,
const BUFFER_HEADER pheader,
const char *  pdata,
int  rp 
)
static

Definition at line 6220 of file midas.cxx.

6220  {
6221  const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + rp);
6222  int event_size = pevent->data_size + sizeof(EVENT_HEADER);
6223  int total_size = ALIGN8(event_size);
6224 
6225  if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
6226  cm_msg(MERROR, "bm_next_rp",
6227  "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6228  pheader->name,
6229  rp,
6230  pevent->data_size,
6231  event_size,
6232  total_size,
6233  pheader->read_pointer,
6234  pheader->write_pointer,
6235  pheader->size,
6236  who);
6237  return -1;
6238  }
6239 
6240  int remaining = 0;
6241  if (rp < pheader->write_pointer) {
6242  remaining = pheader->write_pointer - rp;
6243  } else {
6244  remaining = pheader->size - rp;
6245  remaining += pheader->write_pointer;
6246  }
6247 
6248  //printf("bm_next_rp: total_size %d, remaining %d, rp %d, wp %d, size %d\n", total_size, remaining, rp, pheader->write_pointer, pheader->size);
6249 
6250  if (total_size > remaining) {
6251  cm_msg(MERROR, "bm_next_rp",
6252  "error: buffer \"%s\" is corrupted: rp %d points to an invalid event: data_size %d, event size %d, total_size %d, buffer read_pointer %d, write_pointer %d, size %d, remaining %d, called from %s",
6253  pheader->name,
6254  rp,
6255  pevent->data_size,
6256  event_size,
6257  total_size,
6258  pheader->read_pointer,
6259  pheader->write_pointer,
6260  pheader->size,
6261  remaining,
6262  who);
6263  return -1;
6264  }
6265 
6266  rp = bm_incr_rp_no_check(pheader, rp, total_size);
6267 
6268  return rp;
6269 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_notify_reader_locked()

static void bm_notify_reader_locked ( BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
int  old_write_pointer,
int  request_id 
)
static

Definition at line 9570 of file midas.cxx.

9570  {
9571  if (request_id >= 0) {
9572  /* if that client has a request and is suspended, wake it up */
9573  if (pc->read_wait) {
9574  char str[80];
9575  sprintf(str, "B %s %d", pheader->name, request_id);
9576  ss_resume(pc->port, str);
9577  //printf("bm_notify_reader_locked: buffer [%s] client [%s] request_id %d, port %d, message [%s]\n", pheader->name, pc->name, request_id, pc->port, str);
9578  //printf("bm_notify_reader_locked: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9579  pc->read_wait = FALSE;
9580  }
9581  }
9582 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_open_buffer()

INT bm_open_buffer ( const char *  buffer_name,
INT  buffer_size,
INT buffer_handle 
)

Open an event buffer. Two default buffers are created by the system. The "SYSTEM" buffer is used to exchange events and the "SYSMSG" buffer is used to exchange system messages. The name and size of the event buffers is defined in midas.h as EVENT_BUFFER_NAME and DEFAULT_BUFFER_SIZE. Following example opens the "SYSTEM" buffer, requests events with ID 1 and enters a main loop. Events are then received in process_event()

#include <stdio.h>
#include "midas.h"
void process_event(HNDLE hbuf, HNDLE request_id, EVENT_HEADER *pheader, void *pevent)
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
status = cm_connect_experiment("pc810", "Sample", "Simple Analyzer", NULL);
return 1;
do
{
status = cm_yield(1000);
} while (status != RPC_SHUTDOWN && status != SS_ABORT);
return 0;
}
INT bm_open_buffer(const char *buffer_name, INT buffer_size, INT *buffer_handle)
Definition: midas.cxx:6681
INT bm_request_event(HNDLE buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, HNDLE *request_id, EVENT_HANDLER *func)
Definition: midas.cxx:8431
INT cm_yield(INT millisec)
Definition: midas.cxx:5638
INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void(*func)(char *))
Definition: midas.cxx:2280
INT cm_disconnect_experiment(void)
Definition: midas.cxx:2840
#define CM_SUCCESS
Definition: midas.h:588
#define SS_ABORT
Definition: midas.h:683
#define RPC_SHUTDOWN
Definition: midas.h:713
int main(int argc, char *argv[])
Definition: mana.cxx:5320
INT process_event(ANALYZE_REQUEST *par, EVENT_HEADER *pevent)
Definition: mana.cxx:3081
#define DEFAULT_BUFFER_SIZE
Definition: midas.h:262
#define EVENT_BUFFER_NAME
Definition: midas.h:276
Parameters
buffer_nameName of buffer
buffer_sizeDefault size of buffer in bytes. Can by overwritten with ODB value
buffer_handleBuffer handle returned by function
Returns
BM_SUCCESS, BM_CREATED
BM_NO_SHM Shared memory cannot be created
BM_NO_SEMAPHORE Semaphore cannot be created
BM_NO_MEMORY Not enough memory to create buffer descriptor
BM_MEMSIZE_MISMATCH Buffer size conflicts with an existing buffer of different size
BM_INVALID_PARAM Invalid parameter

Definition at line 6681 of file midas.cxx.

6681  {
6682  INT status;
6683 
6684  if (rpc_is_remote()) {
6685  status = rpc_call(RPC_BM_OPEN_BUFFER, buffer_name, buffer_size, buffer_handle);
6686 
6687  HNDLE hDB;
6689  if (status != SUCCESS || hDB == 0) {
6690  cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6691  return BM_NO_SHM;
6692  }
6693 
6695 
6696  int size = sizeof(INT);
6697  status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6698 
6699  if (status != DB_SUCCESS) {
6700  cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6701  status);
6702  return status;
6703  }
6704 
6705  return status;
6706  }
6707 #ifdef LOCAL_ROUTINES
6708  {
6709  HNDLE shm_handle;
6710  size_t shm_size;
6711  HNDLE hDB;
6712  const int max_buffer_size = 2 * 1000 * 1024 * 1024; // limited by 32-bit integers in the buffer header
6713 
6714  bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
6715 
6716  if (!buffer_name || !buffer_name[0]) {
6717  cm_msg(MERROR, "bm_open_buffer", "cannot open buffer with zero name");
6718  return BM_INVALID_PARAM;
6719  }
6720 
6721  if (strlen(buffer_name) >= NAME_LENGTH) {
6722  cm_msg(MERROR, "bm_open_buffer", "buffer name \"%s\" is longer than %d bytes", buffer_name, NAME_LENGTH);
6723  return BM_INVALID_PARAM;
6724  }
6725 
6727 
6728  if (status != SUCCESS || hDB == 0) {
6729  //cm_msg(MERROR, "bm_open_buffer", "cannot open buffer \'%s\' - not connected to ODB", buffer_name);
6730  return BM_NO_SHM;
6731  }
6732 
6733  /* get buffer size from ODB, user parameter as default if not present in ODB */
6734  std::string odb_path;
6735  odb_path += "/Experiment/Buffer sizes/";
6736  odb_path += buffer_name;
6737 
6738  int size = sizeof(INT);
6739  status = db_get_value(hDB, 0, odb_path.c_str(), &buffer_size, &size, TID_UINT32, TRUE);
6740 
6741  if (buffer_size <= 0 || buffer_size > max_buffer_size) {
6742  cm_msg(MERROR, "bm_open_buffer",
6743  "Cannot open buffer \"%s\", invalid buffer size %d in ODB \"%s\", maximum buffer size is %d",
6744  buffer_name, buffer_size, odb_path.c_str(), max_buffer_size);
6745  return BM_INVALID_PARAM;
6746  }
6747 
6749 
6750  size = sizeof(INT);
6751  status = db_get_value(hDB, 0, "/Experiment/MAX_EVENT_SIZE", &_bm_max_event_size, &size, TID_UINT32, TRUE);
6752 
6753  if (status != DB_SUCCESS) {
6754  cm_msg(MERROR, "bm_open_buffer", "Cannot get ODB /Experiment/MAX_EVENT_SIZE, db_get_value() status %d",
6755  status);
6756  return status;
6757  }
6758 
6759  /* check if buffer already is open */
6760  gBuffersMutex.lock();
6761  for (size_t i = 0; i < gBuffers.size(); i++) {
6762  BUFFER* pbuf = gBuffers[i];
6763  if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6764  *buffer_handle = i + 1;
6765  gBuffersMutex.unlock();
6766  return BM_SUCCESS;
6767  }
6768  }
6769  gBuffersMutex.unlock();
6770 
6771  // only one thread at a time should create new buffers
6772 
6773  static std::mutex gNewBufferMutex;
6774  std::lock_guard<std::mutex> guard(gNewBufferMutex);
6775 
6776  // if we had a race against another thread
6777  // and while we were waiting for gNewBufferMutex
6778  // the other thread created this buffer, we return it.
6779 
6780  gBuffersMutex.lock();
6781  for (size_t i = 0; i < gBuffers.size(); i++) {
6782  BUFFER* pbuf = gBuffers[i];
6783  if (pbuf && pbuf->attached && equal_ustring(pbuf->buffer_name, buffer_name)) {
6784  *buffer_handle = i + 1;
6785  gBuffersMutex.unlock();
6786  return BM_SUCCESS;
6787  }
6788  }
6789  gBuffersMutex.unlock();
6790 
6791  /* allocate new BUFFER object */
6792 
6793  BUFFER* pbuf = new BUFFER;
6794 
6795  /* there is no constructor for BUFFER object, we have to zero the arrays manually */
6796 
6797  for (int i=0; i<MAX_CLIENTS; i++) {
6798  pbuf->client_count_write_wait[i] = 0;
6799  pbuf->client_time_write_wait[i] = 0;
6800  }
6801 
6802  /* create buffer semaphore */
6803 
6805 
6806  if (status != SS_CREATED && status != SS_SUCCESS) {
6807  *buffer_handle = 0;
6808  delete pbuf;
6809  return BM_NO_SEMAPHORE;
6810  }
6811 
6812  std::string client_name = cm_get_client_name();
6813 
6814  /* store client name */
6815  strlcpy(pbuf->client_name, client_name.c_str(), sizeof(pbuf->client_name));
6816 
6817  /* store buffer name */
6818  strlcpy(pbuf->buffer_name, buffer_name, sizeof(pbuf->buffer_name));
6819 
6820  /* lock buffer semaphore to avoid race with bm_open_buffer() in a different program */
6821 
6822  pbuf->attached = true; // required by bm_lock_buffer()
6823 
6824  bm_lock_buffer_guard pbuf_guard(pbuf);
6825 
6826  if (!pbuf_guard.is_locked()) {
6827  // cannot happen, no other thread can see this pbuf
6828  abort();
6829  return BM_NO_SEMAPHORE;
6830  }
6831 
6832  /* open shared memory */
6833 
6834  void *p = NULL;
6835  status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6836 
6837  if (status != SS_SUCCESS && status != SS_CREATED) {
6838  *buffer_handle = 0;
6839  pbuf_guard.unlock();
6840  pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6841  delete pbuf;
6842  return BM_NO_SHM;
6843  }
6844 
6845  pbuf->buffer_header = (BUFFER_HEADER *) p;
6846 
6847  BUFFER_HEADER *pheader = pbuf->buffer_header;
6848 
6849  bool shm_created = (status == SS_CREATED);
6850 
6851  if (shm_created) {
6852  /* initialize newly created shared memory */
6853 
6854  memset(pheader, 0, sizeof(BUFFER_HEADER) + buffer_size);
6855 
6856  strlcpy(pheader->name, buffer_name, sizeof(pheader->name));
6857  pheader->size = buffer_size;
6858 
6859  } else {
6860  /* validate existing shared memory */
6861 
6862  if (!equal_ustring(pheader->name, buffer_name)) {
6863  // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6864  pbuf_guard.unlock();
6865  pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6866  cm_msg(MERROR, "bm_open_buffer",
6867  "Buffer \"%s\" is corrupted, mismatch of buffer name in shared memory \"%s\"", buffer_name,
6868  pheader->name);
6869  *buffer_handle = 0;
6870  delete pbuf;
6871  return BM_CORRUPTED;
6872  }
6873 
6874  if ((pheader->num_clients < 0) || (pheader->num_clients > MAX_CLIENTS)) {
6875  // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6876  pbuf_guard.unlock();
6877  pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6878  cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, num_clients %d exceeds MAX_CLIENTS %d",
6879  buffer_name, pheader->num_clients, MAX_CLIENTS);
6880  *buffer_handle = 0;
6881  delete pbuf;
6882  return BM_CORRUPTED;
6883  }
6884 
6885  if ((pheader->max_client_index < 0) || (pheader->max_client_index > MAX_CLIENTS)) {
6886  // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6887  pbuf_guard.unlock();
6888  pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6889  cm_msg(MERROR, "bm_open_buffer", "Buffer \"%s\" is corrupted, max_client_index %d exceeds MAX_CLIENTS %d",
6891  *buffer_handle = 0;
6892  delete pbuf;
6893  return BM_CORRUPTED;
6894  }
6895 
6896  /* check if buffer size is identical */
6897  if (pheader->size != buffer_size) {
6898  cm_msg(MINFO, "bm_open_buffer", "Buffer \"%s\" requested size %d differs from existing size %d",
6899  buffer_name, buffer_size, pheader->size);
6900 
6901  buffer_size = pheader->size;
6902 
6903  ss_shm_close(buffer_name, p, shm_size, shm_handle, FALSE);
6904 
6905  status = ss_shm_open(buffer_name, sizeof(BUFFER_HEADER) + buffer_size, &p, &shm_size, &shm_handle, FALSE);
6906 
6907  if (status != SS_SUCCESS) {
6908  *buffer_handle = 0;
6909  pbuf_guard.unlock();
6910  pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6911  delete pbuf;
6912  return BM_NO_SHM;
6913  }
6914 
6915  pbuf->buffer_header = (BUFFER_HEADER *) p;
6916  pheader = pbuf->buffer_header;
6917  }
6918  }
6919 
6920  /* shared memory is good from here down */
6921 
6922  pbuf->attached = true;
6923 
6924  pbuf->shm_handle = shm_handle;
6925  pbuf->shm_size = shm_size;
6926  pbuf->callback = FALSE;
6927 
6928  bm_cleanup_buffer_locked(pbuf, "bm_open_buffer", ss_millitime());
6929 
6931  if (status != BM_SUCCESS) {
6932  cm_msg(MERROR, "bm_open_buffer",
6933  "buffer \'%s\' is corrupted, bm_validate_buffer() status %d, calling bm_reset_buffer()...", buffer_name,
6934  status);
6935  bm_reset_buffer_locked(pbuf);
6936  cm_msg(MINFO, "bm_open_buffer", "buffer \'%s\' was reset, all buffered events were lost", buffer_name);
6937  }
6938 
6939  /* add our client BUFFER_HEADER */
6940 
6941  int iclient = 0;
6942  for (; iclient < MAX_CLIENTS; iclient++)
6943  if (pheader->client[iclient].pid == 0)
6944  break;
6945 
6946  if (iclient == MAX_CLIENTS) {
6947  *buffer_handle = 0;
6948  // unlock before calling cm_msg(). if we are SYSMSG, we wil ldeadlock. K.O.
6949  pbuf_guard.unlock();
6950  pbuf_guard.invalidate(); // destructor will see a deleted pbuf
6951  delete pbuf;
6952  cm_msg(MERROR, "bm_open_buffer", "buffer \'%s\' maximum number of clients %d exceeded", buffer_name, MAX_CLIENTS);
6953  return BM_NO_SLOT;
6954  }
6955 
6956  /* store slot index in _buffer structure */
6957  pbuf->client_index = iclient;
6958 
6959  /*
6960  Save the index of the last client of that buffer so that later only
6961  the clients 0..max_client_index-1 have to be searched through.
6962  */
6963  pheader->num_clients++;
6964  if (iclient + 1 > pheader->max_client_index)
6965  pheader->max_client_index = iclient + 1;
6966 
6967  /* setup buffer header and client structure */
6968  BUFFER_CLIENT *pclient = &pheader->client[iclient];
6969 
6970  memset(pclient, 0, sizeof(BUFFER_CLIENT));
6971 
6972  strlcpy(pclient->name, client_name.c_str(), sizeof(pclient->name));
6973 
6974  pclient->pid = ss_getpid();
6975 
6977 
6978  pclient->read_pointer = pheader->write_pointer;
6979  pclient->last_activity = ss_millitime();
6980 
6981  cm_get_watchdog_params(NULL, &pclient->watchdog_timeout);
6982 
6983  pbuf_guard.unlock();
6984 
6985  /* shared memory is not locked from here down, do not touch pheader and pbuf->buffer_header! */
6986 
6987  pheader = NULL;
6988 
6989  /* we are not holding any locks from here down, but other threads cannot see this pbuf yet */
6990 
6993 
6994  /* add pbuf to buffer list */
6995 
6996  gBuffersMutex.lock();
6997 
6998  bool added = false;
6999  for (size_t i=0; i<gBuffers.size(); i++) {
7000  if (gBuffers[i] == NULL) {
7001  gBuffers[i] = pbuf;
7002  added = true;
7003  *buffer_handle = i+1;
7004  break;
7005  }
7006  }
7007  if (!added) {
7008  *buffer_handle = gBuffers.size() + 1;
7009  gBuffers.push_back(pbuf);
7010  }
7011 
7012  /* from here down we should not touch pbuf without locking it */
7013 
7014  pbuf = NULL;
7015 
7016  gBuffersMutex.unlock();
7017 
7018  /* new buffer is now ready for use */
7019 
7020  /* initialize buffer counters */
7021  bm_init_buffer_counters(*buffer_handle);
7022 
7023  bm_cleanup("bm_open_buffer", ss_millitime(), FALSE);
7024 
7025  if (shm_created)
7026  return BM_CREATED;
7027  }
7028 #endif /* LOCAL_ROUTINES */
7029 
7030  return BM_SUCCESS;
7031 }
static void bm_cleanup_buffer_locked(BUFFER *pbuf, const char *who, DWORD actual_time)
Definition: midas.cxx:6029
static DWORD _bm_max_event_size
Definition: midas.cxx:5910
static void bm_clear_buffer_statistics(HNDLE hDB, BUFFER *pbuf)
Definition: midas.cxx:6372
static void bm_reset_buffer_locked(BUFFER *pbuf)
Definition: midas.cxx:6355
INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout)
Definition: midas.cxx:3313
std::string cm_get_client_name()
Definition: midas.cxx:2061
static void bm_cleanup(const char *who, DWORD actual_time, BOOL wrong_interval)
Definition: midas.cxx:6115
#define BM_NO_SLOT
Definition: midas.h:616
#define BM_NO_SHM
Definition: midas.h:628
#define BM_CREATED
Definition: midas.h:612
#define BM_NO_SEMAPHORE
Definition: midas.h:617
#define SS_SUCCESS
Definition: midas.h:669
#define SS_CREATED
Definition: midas.h:670
#define SUCCESS
Definition: mcstd.h:54
INT ss_semaphore_create(const char *name, HNDLE *semaphore_handle)
Definition: system.cxx:2427
INT ss_getpid(void)
Definition: system.cxx:1383
INT ss_shm_open(const char *name, INT size, void **adr, size_t *shm_size, HNDLE *handle, BOOL get_size)
Definition: system.cxx:333
midas_thread_t ss_gettid(void)
Definition: system.cxx:1490
INT ss_suspend_get_buffer_port(midas_thread_t thread_id, INT *port)
Definition: system.cxx:4292
INT db_get_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, void *data, INT *buf_size, DWORD type, BOOL create)
Definition: odb.cxx:5405
#define RPC_BM_OPEN_BUFFER
Definition: mrpc.h:36
INT bm_init_buffer_counters(INT buffer_handle)
Definition: midas.cxx:8028
#define DEFAULT_MAX_EVENT_SIZE
Definition: midas.h:261
size_t EXPRT strlcpy(char *dst, const char *src, size_t size)
INT client_index
Definition: midas.h:995
int client_count_write_wait[MAX_CLIENTS]
Definition: midas.h:1028
DWORD client_time_write_wait[MAX_CLIENTS]
Definition: midas.h:1029
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_buffer_locked()

static int bm_peek_buffer_locked ( BUFFER pbuf,
BUFFER_HEADER pheader,
BUFFER_CLIENT pc,
EVENT_HEADER **  ppevent,
int *  pevent_size,
int *  ptotal_size 
)
static

Definition at line 8858 of file midas.cxx.

8859 {
8860  if (pc->read_pointer == pheader->write_pointer) {
8861  /* no more events buffered for this client */
8862  if (!pc->read_wait) {
8863  //printf("bm_peek_buffer_locked: buffer [%s] client [%s], set read_wait!\n", pheader->name, pc->name);
8864  pc->read_wait = TRUE;
8865  }
8866  return BM_ASYNC_RETURN;
8867  }
8868 
8869  if (pc->read_wait) {
8870  //printf("bm_peek_buffer_locked: buffer [%s] client [%s], clear read_wait!\n", pheader->name, pc->name);
8871  pc->read_wait = FALSE;
8872  }
8873 
8874  if ((pc->read_pointer < 0) || (pc->read_pointer >= pheader->size)) {
8875  cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d is invalid. buffer read pointer %d, write pointer %d, size %d", pheader->name, pc->name, pc->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8876  return BM_CORRUPTED;
8877  }
8878 
8879  char *pdata = (char *) (pheader + 1);
8880 
8881  EVENT_HEADER *pevent = (EVENT_HEADER *) (pdata + pc->read_pointer);
8882  int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8883  int total_size = ALIGN8(event_size);
8884 
8885  if ((total_size <= 0) || (total_size > pheader->size)) {
8886  cm_msg(MERROR, "bm_peek_buffer_locked", "event buffer \"%s\" is corrupted: client \"%s\" read pointer %d points to invalid event: data_size %d, event_size %d, total_size %d. buffer size: %d, read_pointer: %d, write_pointer: %d", pheader->name, pc->name, pc->read_pointer, pevent->data_size, event_size, total_size, pheader->size, pheader->read_pointer, pheader->write_pointer);
8887  return BM_CORRUPTED;
8888  }
8889 
8890  assert(total_size > 0);
8891  assert(total_size <= pheader->size);
8892 
8893  if (ppevent)
8894  *ppevent = pevent;
8895  if (pevent_size)
8896  *pevent_size = event_size;
8897  if (ptotal_size)
8898  *ptotal_size = total_size;
8899 
8900  return BM_SUCCESS;
8901 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_peek_read_cache_locked()

static BOOL bm_peek_read_cache_locked ( BUFFER pbuf,
EVENT_HEADER **  ppevent,
int *  pevent_size,
int *  ptotal_size 
)
static

Definition at line 8832 of file midas.cxx.

8833 {
8834  if (pbuf->read_cache_rp == pbuf->read_cache_wp)
8835  return FALSE;
8836 
8837  EVENT_HEADER *pevent = (EVENT_HEADER *) (pbuf->read_cache + pbuf->read_cache_rp);
8838  int event_size = pevent->data_size + sizeof(EVENT_HEADER);
8839  int total_size = ALIGN8(event_size);
8840 
8841  if (ppevent)
8842  *ppevent = pevent;
8843  if (pevent_size)
8844  *pevent_size = event_size;
8845  if (ptotal_size)
8846  *ptotal_size = total_size;
8847 
8848  return TRUE;
8849 }
Here is the caller graph for this function:

◆ bm_poll_event()

INT bm_poll_event ( void  )

Definition at line 11093 of file midas.cxx.

11107 {
11108  BOOL dispatched_something = FALSE;
11109 
11110  //printf("bm_poll_event!\n");
11111 
11112  DWORD start_time = ss_millitime();
11113 
11114  std::vector<char> vec;
11115 
11116  /* loop over all requests */
11117  _request_list_mutex.lock();
11118  bool locked = true;
11119  size_t n = _request_list.size();
11120  for (size_t i = 0; i < n; i++) {
11121  if (!locked) {
11122  _request_list_mutex.lock();
11123  locked = true;
11124  }
11125  /* continue if no dispatcher set (manual bm_receive_event) */
11126  if (_request_list[i].dispatcher == NULL)
11127  continue;
11128 
11129  int buffer_handle = _request_list[i].buffer_handle;
11130 
11131  /* must release the lock on the request list: user provided r.dispatcher() can add or remove event requests, and we will deadlock. K.O. */
11132  _request_list_mutex.unlock();
11133  locked = false;
11134 
11135  do {
11136  /* receive event */
11137  int status = bm_receive_event_vec(buffer_handle, &vec, BM_NO_WAIT);
11138 
11139  //printf("bm_poll_event: request_id %d, buffer_handle %d, bm_receive_event(BM_NO_WAIT) status %d, vec size %d, capacity %d\n", request_id, buffer_handle, status, (int)vec.size(), (int)vec.capacity());
11140 
11141  /* call user function if successful */
11142  if (status == BM_SUCCESS) {
11143  bm_dispatch_event(buffer_handle, (EVENT_HEADER*)vec.data());
11144  dispatched_something = TRUE;
11145  }
11146 
11147  /* break if no more events */
11148  if (status == BM_ASYNC_RETURN)
11149  break;
11150 
11151  /* break if corrupted event buffer */
11152  if (status == BM_TRUNCATED) {
11153  cm_msg(MERROR, "bm_poll_event", "received event was truncated, buffer size %d is too small, see messages and increase /Experiment/MAX_EVENT_SIZE in ODB", (int)vec.size());
11154  }
11155 
11156  /* break if corrupted event buffer */
11157  if (status == BM_CORRUPTED)
11158  return SS_ABORT;
11159 
11160  /* break if server died */
11161  if (status == RPC_NET_ERROR) {
11162  return SS_ABORT;
11163  }
11164 
11165  /* stop after one second */
11166  if (ss_millitime() - start_time > 1000) {
11167  break;
11168  }
11169 
11170  } while (TRUE);
11171  }
11172 
11173  if (locked)
11174  _request_list_mutex.unlock();
11175 
11176  if (dispatched_something)
11177  return BM_SUCCESS;
11178  else
11179  return BM_ASYNC_RETURN;
11180 }
INT bm_receive_event_vec(INT buffer_handle, std::vector< char > *pvec, int timeout_msec)
Definition: midas.cxx:10776
static void bm_dispatch_event(int buffer_handle, EVENT_HEADER *pevent)
Definition: midas.cxx:8790
#define BM_TRUNCATED
Definition: midas.h:620
#define RPC_NET_ERROR
Definition: midas.h:707
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_buffer()

static INT bm_push_buffer ( BUFFER pbuf,
int  buffer_handle 
)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, RPC_NET_ERROR

Definition at line 10869 of file midas.cxx.

10869  {
10870  //printf("bm_push_buffer: buffer [%s], handle %d, callback %d\n", pbuf->buffer_header->name, buffer_handle, pbuf->callback);
10871 
10872  /* return immediately if no callback routine is defined */
10873  if (!pbuf->callback)
10874  return BM_SUCCESS;
10875 
10876  return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, NULL, BM_NO_WAIT, 0, TRUE);
10877 }
static INT bm_read_buffer(BUFFER *pbuf, INT buffer_handle, void **bufptr, void *buf, INT *buf_size, std::vector< char > *vecptr, int timeout_msec, int convert_flags, BOOL dispatch)
Definition: midas.cxx:10231
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_push_event()

static INT bm_push_event ( const char *  buffer_name)
static

Check a buffer if an event is available and call the dispatch function if found.

Parameters
buffer_nameName of buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_TRUNCATED, BM_ASYNC_RETURN, BM_CORRUPTED, RPC_NET_ERROR

Definition at line 10885 of file midas.cxx.

10886 {
10887  std::vector<BUFFER*> mybuffers;
10888 
10889  gBuffersMutex.lock();
10890  mybuffers = gBuffers;
10891  gBuffersMutex.unlock();
10892 
10893  for (size_t i = 0; i < mybuffers.size(); i++) {
10894  BUFFER *pbuf = mybuffers[i];
10895  if (!pbuf || !pbuf->attached)
10896  continue;
10897  // FIXME: unlocked read access to pbuf->buffer_name!
10898  if (strcmp(buffer_name, pbuf->buffer_name) == 0) {
10899  return bm_push_buffer(pbuf, i + 1);
10900  }
10901  }
10902 
10903  return BM_INVALID_HANDLE;
10904 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_buffer()

static INT bm_read_buffer ( BUFFER pbuf,
INT  buffer_handle,
void **  bufptr,
void *  buf,
INT buf_size,
std::vector< char > *  vecptr,
int  timeout_msec,
int  convert_flags,
BOOL  dispatch 
)
static

Definition at line 10231 of file midas.cxx.

10231  {
10232  INT status = BM_SUCCESS;
10233 
10234  int max_size = 0;
10235  if (buf_size) {
10236  max_size = *buf_size;
10237  *buf_size = 0;
10238  }
10239 
10240  //printf("bm_read_buffer: [%s] timeout %d, conv %d, ptr %p, buf %p, disp %d\n", pbuf->buffer_name, timeout_msec, convert_flags, bufptr, buf, dispatch);
10241 
10242  bm_lock_buffer_guard pbuf_guard(pbuf, true); // buffer is not locked
10243 
10244  // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
10245 
10246  /* look if there is anything in the cache */
10247  if (pbuf->read_cache_size > 0) {
10248 
10250 
10251  if (status != BM_SUCCESS)
10252  return status;
10253 
10254  if (pbuf->read_cache_wp == 0) {
10255 
10256  // lock buffer for the first time
10257 
10258  if (!pbuf_guard.relock()) {
10259  pbuf->read_cache_mutex.unlock();
10260  return pbuf_guard.get_status();
10261  }
10262 
10263  status = bm_fill_read_cache_locked(pbuf_guard, timeout_msec);
10264  if (status != BM_SUCCESS) {
10265  // unlock in correct order
10266  if (pbuf_guard.is_locked()) {
10267  // check if bm_wait_for_more_events() failed to relock the buffer
10268  pbuf_guard.unlock();
10269  }
10270  pbuf->read_cache_mutex.unlock();
10271  return status;
10272  }
10273 
10274  // buffer remains locked here
10275  }
10276  EVENT_HEADER *pevent;
10277  int event_size;
10278  int total_size;
10279  if (bm_peek_read_cache_locked(pbuf, &pevent, &event_size, &total_size)) {
10280  if (pbuf_guard.is_locked()) {
10281  // do not need to keep the event buffer locked
10282  // when reading from the read cache
10283  pbuf_guard.unlock();
10284  }
10285  //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from cache %d %d %d\n", pbuf->buffer_name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10286  status = BM_SUCCESS;
10287  if (buf) {
10288  if (event_size > max_size) {
10289  cm_msg(MERROR, "bm_read_buffer", "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size, event_size, pbuf->buffer_name);
10290  event_size = max_size;
10291  status = BM_TRUNCATED;
10292  }
10293 
10294  memcpy(buf, pevent, event_size);
10295 
10296  if (buf_size) {
10297  *buf_size = event_size;
10298  }
10299  if (convert_flags) {
10300  bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10301  }
10302  } else if (bufptr) {
10303  *bufptr = malloc(event_size);
10304  memcpy(*bufptr, pevent, event_size);
10305  status = BM_SUCCESS;
10306  } else if (vecptr) {
10307  vecptr->resize(0);
10308  char* cptr = (char*)pevent;
10309  vecptr->assign(cptr, cptr+event_size);
10310  }
10311  bm_incr_read_cache_locked(pbuf, total_size);
10312  pbuf->read_cache_mutex.unlock();
10313  if (dispatch) {
10314  // FIXME need to protect currently dispatched event against
10315  // another thread overwriting it by refilling the read cache
10316  bm_dispatch_event(buffer_handle, pevent);
10317  return BM_MORE_EVENTS;
10318  }
10319  // buffer is unlocked here
10320  return status;
10321  }
10322  pbuf->read_cache_mutex.unlock();
10323  }
10324 
10325  /* we come here if the read cache is disabled */
10326  /* we come here if the next event is too big to fit into the read cache */
10327 
10328  if (!pbuf_guard.is_locked()) {
10329  if (!pbuf_guard.relock())
10330  return pbuf_guard.get_status();
10331  }
10332 
10333  EVENT_HEADER *event_buffer = NULL;
10334 
10335  BUFFER_HEADER *pheader = pbuf->buffer_header;
10336 
10337  BUFFER_CLIENT *pc = bm_get_my_client(pbuf, pheader);
10338 
10339  while (1) {
10340  /* loop over events in the event buffer */
10341 
10342  status = bm_wait_for_more_events_locked(pbuf_guard, pc, timeout_msec, FALSE);
10343 
10344  if (status != BM_SUCCESS) {
10345  // implicit unlock
10346  return status;
10347  }
10348 
10349  /* check if event at current read pointer matches a request */
10350 
10351  EVENT_HEADER *pevent;
10352  int event_size;
10353  int total_size;
10354 
10355  status = bm_peek_buffer_locked(pbuf, pheader, pc, &pevent, &event_size, &total_size);
10356  if (status == BM_CORRUPTED) {
10357  // implicit unlock
10358  return status;
10359  } else if (status != BM_SUCCESS) {
10360  /* event buffer is empty */
10361  break;
10362  }
10363 
10364  BOOL is_requested = bm_check_requests(pc, pevent);
10365 
10366  if (is_requested) {
10367  //printf("bm_read_buffer: [%s] async %d, conv %d, ptr %p, buf %p, disp %d, total_size %d, read from buffer, cache %d %d %d\n", pheader->name, async_flag, convert_flags, bufptr, buf, dispatch, total_size, pbuf->read_cache_size, pbuf->read_cache_rp, pbuf->read_cache_wp);
10368 
10369  status = BM_SUCCESS;
10370 
10371  if (buf) {
10372  if (event_size > max_size) {
10373  cm_msg(MERROR, "bm_read_buffer",
10374  "buffer size %d is smaller than event size %d, event truncated. buffer \"%s\"", max_size,
10375  event_size, pheader->name);
10376  event_size = max_size;
10377  status = BM_TRUNCATED;
10378  }
10379 
10380  bm_read_from_buffer_locked(pheader, pc->read_pointer, (char *) buf, event_size);
10381 
10382  if (buf_size) {
10383  *buf_size = event_size;
10384  }
10385 
10386  if (convert_flags) {
10387  bm_convert_event_header((EVENT_HEADER *) buf, convert_flags);
10388  }
10389 
10390  pbuf->count_read++;
10391  pbuf->bytes_read += event_size;
10392  } else if (dispatch || bufptr) {
10393  assert(event_buffer == NULL); // make sure we only come here once
10394  event_buffer = (EVENT_HEADER *) malloc(event_size);
10396  pbuf->count_read++;
10397  pbuf->bytes_read += event_size;
10398  } else if (vecptr) {
10399  bm_read_from_buffer_locked(pheader, pc->read_pointer, vecptr, event_size);
10400  pbuf->count_read++;
10401  pbuf->bytes_read += event_size;
10402  }
10403 
10404  int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10405  pc->read_pointer = new_read_pointer;
10406 
10407  pheader->num_out_events++;
10408  /* exit loop over events */
10409  break;
10410  }
10411 
10412  int new_read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
10413  pc->read_pointer = new_read_pointer;
10414  pheader->num_out_events++;
10415  }
10416 
10417  /*
10418  If read pointer has been changed, it may have freed up some space
10419  for waiting producers. So check if free space is now more than 50%
10420  of the buffer size and wake waiting producers.
10421  */
10422 
10423  bm_wakeup_producers_locked(pheader, pc);
10424 
10425  pbuf_guard.unlock();
10426 
10427  if (dispatch && event_buffer) {
10428  bm_dispatch_event(buffer_handle, event_buffer);
10429  free(event_buffer);
10430  event_buffer = NULL;
10431  return BM_MORE_EVENTS;
10432  }
10433 
10434  if (bufptr && event_buffer) {
10435  *bufptr = event_buffer;
10436  event_buffer = NULL;
10437  status = BM_SUCCESS;
10438  }
10439 
10440  if (event_buffer) {
10441  free(event_buffer);
10442  event_buffer = NULL;
10443  }
10444 
10445  return status;
10446 }
static void bm_convert_event_header(EVENT_HEADER *pevent, int convert_flags)
Definition: midas.cxx:9036
static int bm_fill_read_cache_locked(bm_lock_buffer_guard &pbuf_guard, int timeout_msec)
Definition: midas.cxx:8959
static BOOL bm_peek_read_cache_locked(BUFFER *pbuf, EVENT_HEADER **ppevent, int *pevent_size, int *ptotal_size)
Definition: midas.cxx:8832
static void bm_incr_read_cache_locked(BUFFER *pbuf, int total_size)
Definition: midas.cxx:8822
void * event_buffer
Definition: mfe.cxx:66
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [1/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
char *  buf,
int  event_size 
)
static

Definition at line 8903 of file midas.cxx.

8904 {
8905  const char *pdata = (const char *) (pheader + 1);
8906 
8907  if (rp + event_size <= pheader->size) {
8908  /* copy event to cache */
8909  memcpy(buf, pdata + rp, event_size);
8910  } else {
8911  /* event is splitted */
8912  int size = pheader->size - rp;
8913  memcpy(buf, pdata + rp, size);
8914  memcpy(buf + size, pdata, event_size - size);
8915  }
8916 }
Here is the caller graph for this function:

◆ bm_read_from_buffer_locked() [2/2]

static void bm_read_from_buffer_locked ( const BUFFER_HEADER pheader,
int  rp,
std::vector< char > *  vecptr,
int  event_size 
)
static

Definition at line 8918 of file midas.cxx.

8919 {
8920  const char *pdata = (const char *) (pheader + 1);
8921 
8922  if (rp + event_size <= pheader->size) {
8923  /* copy event to cache */
8924  vecptr->assign(pdata + rp, pdata + rp + event_size);
8925  } else {
8926  /* event is splitted */
8927  int size = pheader->size - rp;
8928  vecptr->assign(pdata + rp, pdata + rp + size);
8929  vecptr->insert(vecptr->end(), pdata, pdata + event_size - size);
8930  }
8931 }

◆ bm_receive_event()

INT bm_receive_event ( INT  buffer_handle,
void *  destination,
INT buf_size,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
INT bm_receive_event(INT buffer_handle, void *destination, INT *buf_size, int timeout_msec)
Definition: midas.cxx:10617
Parameters
buffer_handlebuffer handle
destinationdestination address where event is written to
buf_sizesize of destination buffer on input, size of event plus header on return.
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_TRUNCATED The event is larger than the destination buffer and was therefore truncated
BM_ASYNC_RETURN No event available

Definition at line 10617 of file midas.cxx.

10617  {
10618  //printf("bm_receive_event: handle %d, async %d\n", buffer_handle, async_flag);
10619  if (rpc_is_remote()) {
10620  return bm_receive_event_rpc(buffer_handle, destination, buf_size, NULL, NULL, timeout_msec);
10621  }
10622 #ifdef LOCAL_ROUTINES
10623  {
10624  INT status = BM_SUCCESS;
10625 
10626  BUFFER *pbuf = bm_get_buffer("bm_receive_event", buffer_handle, &status);
10627 
10628  if (!pbuf)
10629  return status;
10630 
10631  int convert_flags = rpc_get_convert_flags();
10632 
10633  status = bm_read_buffer(pbuf, buffer_handle, NULL, destination, buf_size, NULL, timeout_msec, convert_flags, FALSE);
10634  //printf("bm_receive_event: handle %d, async %d, status %d, size %d\n", buffer_handle, async_flag, status, *buf_size);
10635  return status;
10636  }
10637 #else /* LOCAL_ROUTINES */
10638 
10639  return BM_SUCCESS;
10640 #endif
10641 }
static INT bm_receive_event_rpc(INT buffer_handle, void *buf, int *buf_size, EVENT_HEADER **ppevent, std::vector< char > *pvec, int timeout_msec)
Definition: midas.cxx:10450
void rpc_get_convert_flags(INT *convert_flags)
Definition: midas.cxx:11573
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_alloc()

INT bm_receive_event_alloc ( INT  buffer_handle,
EVENT_HEADER **  ppevent,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10698 of file midas.cxx.

10698  {
10699  if (rpc_is_remote()) {
10700  return bm_receive_event_rpc(buffer_handle, NULL, NULL, ppevent, NULL, timeout_msec);
10701  }
10702 #ifdef LOCAL_ROUTINES
10703  {
10704  INT status = BM_SUCCESS;
10705 
10706  BUFFER *pbuf = bm_get_buffer("bm_receive_event_alloc", buffer_handle, &status);
10707 
10708  if (!pbuf)
10709  return status;
10710 
10711  int convert_flags = rpc_get_convert_flags();
10712 
10713  return bm_read_buffer(pbuf, buffer_handle, (void **) ppevent, NULL, NULL, NULL, timeout_msec, convert_flags, FALSE);
10714  }
10715 #else /* LOCAL_ROUTINES */
10716 
10717  return BM_SUCCESS;
10718 #endif
10719 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_rpc()

static INT bm_receive_event_rpc ( INT  buffer_handle,
void *  buf,
int *  buf_size,
EVENT_HEADER **  ppevent,
std::vector< char > *  pvec,
int  timeout_msec 
)
static

Definition at line 10450 of file midas.cxx.

10451 {
10452  //printf("bm_receive_event_rpc: handle %d, buf %p, pevent %p, pvec %p, timeout %d, max_event_size %d\n", buffer_handle, buf, ppevent, pvec, timeout_msec, _bm_max_event_size);
10453 
10454  assert(_bm_max_event_size > sizeof(EVENT_HEADER));
10455 
10456  void *xbuf = NULL;
10457  int xbuf_size = 0;
10458 
10459  if (buf) {
10460  xbuf = buf;
10461  xbuf_size = *buf_size;
10462  } else if (ppevent) {
10463  *ppevent = (EVENT_HEADER*)malloc(_bm_max_event_size);
10464  xbuf_size = _bm_max_event_size;
10465  } else if (pvec) {
10466  pvec->resize(_bm_max_event_size);
10467  xbuf = pvec->data();
10468  xbuf_size = pvec->size();
10469  } else {
10470  assert(!"incorrect call to bm_receivent_event_rpc()");
10471  }
10472 
10473  int status;
10474  DWORD time_start = ss_millitime();
10475  DWORD time_end = time_start + timeout_msec;
10476 
10477  int xtimeout_msec = timeout_msec;
10478 
10479  int zbuf_size = xbuf_size;
10480 
10481  while (1) {
10482  if (timeout_msec == BM_WAIT) {
10483  xtimeout_msec = 1000;
10484  } else if (timeout_msec == BM_NO_WAIT) {
10485  xtimeout_msec = BM_NO_WAIT;
10486  } else {
10487  if (xtimeout_msec > 1000) {
10488  xtimeout_msec = 1000;
10489  }
10490  }
10491 
10492  zbuf_size = xbuf_size;
10493 
10494  status = rpc_call(RPC_BM_RECEIVE_EVENT, buffer_handle, xbuf, &zbuf_size, xtimeout_msec);
10495 
10496  //printf("bm_receive_event_rpc: handle %d, timeout %d, status %d, size %d in, %d out, via RPC_BM_RECEIVE_EVENT\n", buffer_handle, xtimeout_msec, status, xbuf_size, zbuf_size);
10497 
10498  if (status == BM_ASYNC_RETURN) {
10499  if (timeout_msec == BM_WAIT) {
10500  // BM_WAIT means wait forever
10501  continue;
10502  } else if (timeout_msec == BM_NO_WAIT) {
10503  // BM_NO_WAIT means do not wait
10504  break;
10505  } else {
10506  DWORD now = ss_millitime();
10507  if (now >= time_end) {
10508  // timeout, return BM_ASYNC_RETURN
10509  break;
10510  }
10511 
10512  DWORD remain = time_end - now;
10513 
10514  if (remain < (DWORD)xtimeout_msec) {
10515  xtimeout_msec = remain;
10516  }
10517 
10518  // keep asking for event...
10519  continue;
10520  }
10521  } else if (status == BM_SUCCESS) {
10522  // success, return BM_SUCCESS
10523  break;
10524  }
10525 
10526  // RPC error
10527 
10528  if (buf) {
10529  *buf_size = 0;
10530  } else if (ppevent) {
10531  free(*ppevent);
10532  *ppevent = NULL;
10533  } else if (pvec) {
10534  pvec->resize(0);
10535  } else {
10536  assert(!"incorrect call to bm_receivent_event_rpc()");
10537  }
10538 
10539  return status;
10540  }
10541 
10542  // status is BM_SUCCESS or BM_ASYNC_RETURN
10543 
10544  if (buf) {
10545  *buf_size = zbuf_size;
10546  } else if (ppevent) {
10547  // nothing to do
10548  // ppevent = realloc(ppevent, xbuf_size); // shrink memory allocation
10549  } else if (pvec) {
10550  pvec->resize(zbuf_size);
10551  } else {
10552  assert(!"incorrect call to bm_receivent_event_rpc()");
10553  }
10554 
10555  return status;
10556 }
#define RPC_BM_RECEIVE_EVENT
Definition: mrpc.h:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_receive_event_vec()

INT bm_receive_event_vec ( INT  buffer_handle,
std::vector< char > *  pvec,
int  timeout_msec 
)

Receives events directly. This function is an alternative way to receive events without a main loop.

It can be used in analysis systems which actively receive events, rather than using callbacks. A analysis package could for example contain its own command line interface. A command like "receive 1000 events" could make it necessary to call bm_receive_event() 1000 times in a row to receive these events and then return back to the command line prompt. The according bm_request_event() call contains NULL as the callback routine to indicate that bm_receive_event() is called to receive events.

#include <stdio.h>
#include "midas.h"
{
printf("Received event #%d\r",
pheader->serial_number);
}
{
INT status, request_id;
HNDLE hbuf;
char event_buffer[1000];
status = cm_connect_experiment("", "Sample",
"Simple Analyzer", NULL);
return 1;
bm_request_event(hbuf, 1, TRIGGER_ALL, GET_ALL, request_id, NULL);
do
{
size = sizeof(event_buffer);
<...do something else...>
} while (status != RPC_SHUTDOWN &&
return 0;
}
Parameters
buffer_handlebuffer handle
ppeventpointer to the received event pointer, event pointer should be free()ed to avoid memory leak
timeout_msecWait so many millisecond for new data. Special values: BM_WAIT: wait forever, BM_NO_WAIT: do not wait, return BM_ASYNC_RETURN if no data is immediately available
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_ASYNC_RETURN No event available

Definition at line 10776 of file midas.cxx.

10776  {
10777  if (rpc_is_remote()) {
10778  return bm_receive_event_rpc(buffer_handle, NULL, NULL, NULL, pvec, timeout_msec);
10779  }
10780 #ifdef LOCAL_ROUTINES
10781  {
10782  INT status = BM_SUCCESS;
10783 
10784  BUFFER *pbuf = bm_get_buffer("bm_receive_event_vec", buffer_handle, &status);
10785 
10786  if (!pbuf)
10787  return status;
10788 
10789  int convert_flags = rpc_get_convert_flags();
10790 
10791  return bm_read_buffer(pbuf, buffer_handle, NULL, NULL, NULL, pvec, timeout_msec, convert_flags, FALSE);
10792  }
10793 #else /* LOCAL_ROUTINES */
10794  return BM_SUCCESS;
10795 #endif
10796 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_client_locked()

void bm_remove_client_locked ( BUFFER_HEADER pheader,
int  j 
)

Called to forcibly disconnect given client from a data buffer

Definition at line 5998 of file midas.cxx.

5998  {
5999  int k, nc;
6000  BUFFER_CLIENT *pbctmp;
6001 
6002  /* clear entry from client structure in buffer header */
6003  memset(&(pheader->client[j]), 0, sizeof(BUFFER_CLIENT));
6004 
6005  /* calculate new max_client_index entry */
6006  for (k = MAX_CLIENTS - 1; k >= 0; k--)
6007  if (pheader->client[k].pid != 0)
6008  break;
6009  pheader->max_client_index = k + 1;
6010 
6011  /* count new number of clients */
6012  for (k = MAX_CLIENTS - 1, nc = 0; k >= 0; k--)
6013  if (pheader->client[k].pid != 0)
6014  nc++;
6015  pheader->num_clients = nc;
6016 
6017  /* check if anyone is waiting and wake him up */
6018  pbctmp = pheader->client;
6019 
6020  for (k = 0; k < pheader->max_client_index; k++, pbctmp++)
6021  if (pbctmp->pid && (pbctmp->write_wait || pbctmp->read_wait))
6022  ss_resume(pbctmp->port, "B ");
6023 }
INT k
Definition: odbhist.cxx:40
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_remove_event_request()

INT bm_remove_event_request ( INT  buffer_handle,
INT  request_id 
)

Delete a previously placed request for a specific event type in the client structure of the buffer refereced by buffer_handle.

Parameters
buffer_handleHandle to the buffer where the re- quest should be placed in
request_idRequest id returned by bm_request_event
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NOT_FOUND, RPC_NET_ERROR

Definition at line 8484 of file midas.cxx.

8484  {
8485  if (rpc_is_remote())
8486  return rpc_call(RPC_BM_REMOVE_EVENT_REQUEST, buffer_handle, request_id);
8487 
8488 #ifdef LOCAL_ROUTINES
8489  {
8490  int status = 0;
8491 
8492  BUFFER *pbuf = bm_get_buffer("bm_remove_event_request", buffer_handle, &status);
8493 
8494  if (!pbuf)
8495  return status;
8496 
8497  /* lock buffer */
8498  bm_lock_buffer_guard pbuf_guard(pbuf);
8499 
8500  if (!pbuf_guard.is_locked())
8501  return pbuf_guard.get_status();
8502 
8503  INT i, deleted;
8504 
8505  /* get a pointer to the proper client structure */
8506  BUFFER_HEADER *pheader = pbuf->buffer_header;
8507  BUFFER_CLIENT *pclient = bm_get_my_client(pbuf, pheader);
8508 
8509  /* check all requests and set to zero if matching */
8510  for (i = 0, deleted = 0; i < pclient->max_request_index; i++)
8511  if (pclient->event_request[i].valid && pclient->event_request[i].id == request_id) {
8512  memset(&pclient->event_request[i], 0, sizeof(EVENT_REQUEST));
8513  deleted++;
8514  }
8515 
8516  /* calculate new max_request_index entry */
8517  for (i = MAX_EVENT_REQUESTS - 1; i >= 0; i--)
8518  if (pclient->event_request[i].valid)
8519  break;
8520 
8521  pclient->max_request_index = i + 1;
8522 
8523  /* calculate new all_flag */
8524  pclient->all_flag = FALSE;
8525 
8526  for (i = 0; i < pclient->max_request_index; i++)
8527  if (pclient->event_request[i].valid && (pclient->event_request[i].sampling_type & GET_ALL)) {
8528  pclient->all_flag = TRUE;
8529  break;
8530  }
8531 
8532  pbuf->get_all_flag = pclient->all_flag;
8533 
8534  if (!deleted)
8535  return BM_NOT_FOUND;
8536  }
8537 #endif /* LOCAL_ROUTINES */
8538 
8539  return BM_SUCCESS;
8540 }
#define RPC_BM_REMOVE_EVENT_REQUEST
Definition: mrpc.h:44
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_request_event()

INT bm_request_event ( HNDLE  buffer_handle,
short int  event_id,
short int  trigger_mask,
INT  sampling_type,
HNDLE request_id,
EVENT_HANDLER func 
)

dox Place an event request based on certain characteristics. Multiple event requests can be placed for each buffer, which are later identified by their request ID. They can contain different callback routines. Example see bm_open_buffer() and bm_receive_event()

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
event_idevent ID for requested events. Use EVENTID_ALL to receive events with any ID.
trigger_masktrigger mask for requested events. The requested events must have at least one bit in its trigger mask common with the requested trigger mask. Use TRIGGER_ALL to receive events with any trigger mask.
sampling_typespecifies how many events to receive. A value of GET_ALL receives all events which match the specified event ID and trigger mask. If the events are consumed slower than produced, the producer is automatically slowed down. A value of GET_NONBLOCKING receives as much events as possible without slowing down the producer. GET_ALL is typically used by the logger, while GET_NONBLOCKING is typically used by analyzers.
request_idrequest ID returned by the function. This ID is passed to the callback routine and must be used in the bm_delete_request() routine.
funcallback routine which gets called when an event of the specified type is received.
Returns
BM_SUCCESS, BM_INVALID_HANDLE
BM_NO_MEMORY too many requests. The value MAX_EVENT_REQUESTS in midas.h should be increased.

Definition at line 8431 of file midas.cxx.

8435 {
8436  assert(request_id != NULL);
8437 
8438  EventRequest r;
8439  r.buffer_handle = buffer_handle;
8440  r.event_id = event_id;
8442  r.dispatcher = func;
8443 
8444  {
8445  std::lock_guard<std::mutex> guard(_request_list_mutex);
8446 
8447  bool found = false;
8448 
8449  // find deleted entry
8450  for (size_t i = 0; i < _request_list.size(); i++) {
8451  if (_request_list[i].buffer_handle == 0) {
8452  _request_list[i] = r;
8453  *request_id = i;
8454  found = true;
8455  break;
8456  }
8457  }
8458 
8459  if (!found) { // not found
8460  *request_id = _request_list.size();
8461  _request_list.push_back(r);
8462  }
8463 
8464  // implicit unlock()
8465  }
8466 
8467  /* add request in buffer structure */
8468  int status = bm_add_event_request(buffer_handle, event_id, trigger_mask, sampling_type, func, *request_id);
8469  if (status != BM_SUCCESS)
8470  return status;
8471 
8472  return BM_SUCCESS;
8473 }
INT bm_add_event_request(INT buffer_handle, short int event_id, short int trigger_mask, INT sampling_type, EVENT_HANDLER *func, INT request_id)
Definition: midas.cxx:8279
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_reset_buffer_locked()

static void bm_reset_buffer_locked ( BUFFER pbuf)
static

Definition at line 6355 of file midas.cxx.

6355  {
6356  BUFFER_HEADER *pheader = pbuf->buffer_header;
6357 
6358  //printf("bm_reset_buffer: buffer \"%s\"\n", pheader->name);
6359 
6360  pheader->read_pointer = 0;
6361  pheader->write_pointer = 0;
6362 
6363  int i;
6364  for (i = 0; i < pheader->max_client_index; i++) {
6365  BUFFER_CLIENT *pc = pheader->client + i;
6366  if (pc->pid) {
6367  pc->read_pointer = 0;
6368  }
6369  }
6370 }
Here is the caller graph for this function:

◆ bm_send_event()

INT bm_send_event ( INT  buffer_handle,
const EVENT_HEADER pevent,
int  unused,
int  timeout_msec 
)

Definition at line 9645 of file midas.cxx.

9646 {
9647  const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9648  const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9649 
9650  if (data_size == 0) {
9651  cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9652  return BM_INVALID_SIZE;
9653  }
9654 
9655  if (data_size > MAX_DATA_SIZE) {
9656  cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9657  return BM_INVALID_SIZE;
9658  }
9659 
9660  const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9661 
9662  //printf("bm_send_event: pevent %p, data_size %d, event_size %d, buf_size %d\n", pevent, data_size, event_size, unused);
9663 
9664  if (rpc_is_remote()) {
9665  //return bm_send_event_rpc(buffer_handle, pevent, event_size, timeout_msec);
9666  return rpc_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size);
9667  } else {
9668  return bm_send_event_sg(buffer_handle, 1, (char**)&pevent, &event_size, timeout_msec);
9669  }
9670 }
int bm_send_event_sg(int buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[], int timeout_msec)
Definition: midas.cxx:9745
#define BM_INVALID_SIZE
Definition: midas.h:630
INT rpc_send_event_sg(INT buffer_handle, int sg_n, const char *const sg_ptr[], const size_t sg_len[])
Definition: midas.cxx:13892
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_sg()

int bm_send_event_sg ( int  buffer_handle,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
int  timeout_msec 
)

Sends an event to a buffer. This function check if the buffer has enough space for the event, then copies the event to the buffer in shared memory. If clients have requests for the event, they are notified via an UDP packet.

char event[1000];
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set first byte of event
*(event+sizeof(EVENT_HEADER)) = <...>
#include <stdio.h>
#include "midas.h"
{
HNDLE hbuf;
char event[1000];
status = cm_connect_experiment("", "Sample", "Producer", NULL);
return 1;
// create event with ID 1, trigger mask 0, size 100 bytes and serial number 1
bm_compose_event((EVENT_HEADER *) event, 1, 0, 100, 1);
// set event data
for (i=0 ; i<100 ; i++)
*(event+sizeof(EVENT_HEADER)+i) = i;
// send event
bm_send_event(hbuf, event, 100+sizeof(EVENT_HEADER), BM_WAIT);
return 0;
}
INT bm_send_event(INT buffer_handle, const EVENT_HEADER *pevent, int unused, int timeout_msec)
Definition: midas.cxx:9645
Parameters
buffer_handleBuffer handle obtained via bm_open_buffer()
sourceAddress of event buffer
buf_sizeSize of event including event header in bytes
timeout_msecTimeout waiting for free space in the event buffer. If BM_WAIT, wait forever. If BM_NO_WAIT, the function returns immediately with a value of BM_ASYNC_RETURN without writing the event to the buffer
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_INVALID_PARAM
BM_ASYNC_RETURN Routine called with timeout_msec == BM_NO_WAIT and buffer has not enough space to receive event
BM_NO_MEMORY Event is too large for network buffer or event buffer. One has to increase the event buffer size "/Experiment/Buffer sizes/SYSTEM" and/or /Experiment/MAX_EVENT_SIZE in ODB.

Definition at line 9745 of file midas.cxx.

9746 {
9747  if (rpc_is_remote())
9748  return rpc_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len);
9749 
9750  if (sg_n < 1) {
9751  cm_msg(MERROR, "bm_send_event", "invalid sg_n %d", sg_n);
9752  return BM_INVALID_SIZE;
9753  }
9754 
9755  if (sg_ptr[0] == NULL) {
9756  cm_msg(MERROR, "bm_send_event", "invalid sg_ptr[0] is NULL");
9757  return BM_INVALID_SIZE;
9758  }
9759 
9760  if (sg_len[0] < sizeof(EVENT_HEADER)) {
9761  cm_msg(MERROR, "bm_send_event", "invalid sg_len[0] value %d is smaller than event header size %d", (int)sg_len[0], (int)sizeof(EVENT_HEADER));
9762  return BM_INVALID_SIZE;
9763  }
9764 
9765  const EVENT_HEADER* pevent = (const EVENT_HEADER*)sg_ptr[0];
9766 
9767  const DWORD MAX_DATA_SIZE = (0x7FFFFFF0 - 16); // event size computations are not 32-bit clean, limit event size to 2GB. K.O.
9768  const DWORD data_size = pevent->data_size; // 32-bit unsigned value
9769 
9770  if (data_size == 0) {
9771  cm_msg(MERROR, "bm_send_event", "invalid event data size zero");
9772  return BM_INVALID_SIZE;
9773  }
9774 
9775  if (data_size > MAX_DATA_SIZE) {
9776  cm_msg(MERROR, "bm_send_event", "invalid event data size %d (0x%x) maximum is %d (0x%x)", data_size, data_size, MAX_DATA_SIZE, MAX_DATA_SIZE);
9777  return BM_INVALID_SIZE;
9778  }
9779 
9780  const size_t event_size = sizeof(EVENT_HEADER) + data_size;
9781  const size_t total_size = ALIGN8(event_size);
9782 
9783  size_t count = 0;
9784  for (int i=0; i<sg_n; i++) {
9785  count += sg_len[i];
9786  }
9787 
9788  if (count != event_size) {
9789  cm_msg(MERROR, "bm_send_event", "data size mismatch: event data_size %d, event_size %d not same as sum of sg_len %d", (int)data_size, (int)event_size, (int)count);
9790  return BM_INVALID_SIZE;
9791  }
9792 
9793  //printf("bm_send_event_sg: pevent %p, event_id 0x%04x, serial 0x%08x, data_size %d, event_size %d, total_size %d\n", pevent, pevent->event_id, pevent->serial_number, (int)pevent->data_size, (int)event_size, (int)total_size);
9794 
9795 #ifdef LOCAL_ROUTINES
9796  {
9797  int status = 0;
9798 
9799  BUFFER *pbuf = bm_get_buffer("bm_send_event_sg", buffer_handle, &status);
9800 
9801  if (!pbuf)
9802  return status;
9803 
9804  /* round up total_size to next DWORD boundary */
9805  //int total_size = ALIGN8(event_size);
9806 
9807  /* check if write cache is enabled */
9808  if (pbuf->write_cache_size) {
9810 
9811  if (status != BM_SUCCESS)
9812  return status;
9813 
9814  /* check if write cache is enabled */
9815  if (pbuf->write_cache_size) {
9817  bool too_big = event_size > max_event_size;
9818 
9819  //printf("bm_send_event: write %zu/%zu max %zu, cache size %zu, wp %zu\n", event_size, total_size, max_event_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9820 
9821  /* if this event does not fit into the write cache, flush the write cache */
9822  if (pbuf->write_cache_wp > 0 && (pbuf->write_cache_wp + total_size > pbuf->write_cache_size || too_big)) {
9823  //printf("bm_send_event: write %zu/%zu but cache is full, size %zu, wp %zu\n", event_size, total_size, pbuf->write_cache_size.load(), pbuf->write_cache_wp);
9824 
9825  bm_lock_buffer_guard pbuf_guard(pbuf);
9826 
9827  if (!pbuf_guard.is_locked()) {
9828  pbuf->write_cache_mutex.unlock();
9829  return pbuf_guard.get_status();
9830  }
9831 
9832  int status = bm_flush_cache_locked(pbuf_guard, timeout_msec);
9833 
9834  if (pbuf_guard.is_locked()) {
9835  // check if bm_wait_for_free_space() failed to relock the buffer
9836  pbuf_guard.unlock();
9837  }
9838 
9839  if (status != BM_SUCCESS) {
9840  pbuf->write_cache_mutex.unlock();
9841  // bm_flush_cache() failed: timeout in bm_wait_for_free_space() or write cache size is bigger than buffer size or buffer was closed.
9842  if (status == BM_NO_MEMORY)
9843  cm_msg(MERROR, "bm_send_event", "write cache size is bigger than buffer size");
9844  return status;
9845  }
9846 
9847  // write cache must be empty here
9848  assert(pbuf->write_cache_wp == 0);
9849  }
9850 
9851  /* write this event into the write cache, if it is not too big and if it fits */
9852  if (!too_big && pbuf->write_cache_wp + total_size <= pbuf->write_cache_size) {
9853  //printf("bm_send_event: write %d/%d to cache size %d, wp %d\n", (int)event_size, (int)total_size, (int)pbuf->write_cache_size, (int)pbuf->write_cache_wp);
9854 
9855  char* wptr = pbuf->write_cache + pbuf->write_cache_wp;
9856 
9857  for (int i=0; i<sg_n; i++) {
9858  memcpy(wptr, sg_ptr[i], sg_len[i]);
9859  wptr += sg_len[i];
9860  }
9861 
9862  pbuf->write_cache_wp += total_size;
9863 
9864  pbuf->write_cache_mutex.unlock();
9865  return BM_SUCCESS;
9866  }
9867  }
9868 
9869  /* event did not fit into the write cache, we flushed the write cache and we send it directly to shared memory */
9870  pbuf->write_cache_mutex.unlock();
9871  }
9872 
9873  /* we come here only for events that are too big to fit into the cache */
9874 
9875  /* lock the buffer */
9876  bm_lock_buffer_guard pbuf_guard(pbuf);
9877 
9878  if (!pbuf_guard.is_locked()) {
9879  return pbuf_guard.get_status();
9880  }
9881 
9882  /* calculate some shorthands */
9883  BUFFER_HEADER *pheader = pbuf->buffer_header;
9884 
9885 #if 0
9887  if (status != BM_SUCCESS) {
9888  printf("bm_send_event: corrupted 111!\n");
9889  abort();
9890  }
9891 #endif
9892 
9893  /* check if buffer is large enough */
9894  if (total_size >= (size_t)pheader->size) {
9895  pbuf_guard.unlock(); // unlock before cm_msg()
9896  cm_msg(MERROR, "bm_send_event", "total event size (%d) larger than size (%d) of buffer \'%s\'", (int)total_size, pheader->size, pheader->name);
9897  return BM_NO_MEMORY;
9898  }
9899 
9900  status = bm_wait_for_free_space_locked(pbuf_guard, timeout_msec, total_size, false);
9901 
9902  if (status != BM_SUCCESS) {
9903  // implicit unlock
9904  return status;
9905  }
9906 
9907 #if 0
9909  if (status != BM_SUCCESS) {
9910  printf("bm_send_event: corrupted 222!\n");
9911  abort();
9912  }
9913 #endif
9914 
9915  int old_write_pointer = pheader->write_pointer;
9916 
9917  bm_write_to_buffer_locked(pheader, sg_n, sg_ptr, sg_len, total_size);
9918 
9919  /* write pointer was incremented, but there should
9920  * always be some free space in the buffer and the
9921  * write pointer should never cacth up to the read pointer:
9922  * the rest of the code gets confused this happens (buffer 100% full)
9923  * as it is write_pointer == read_pointer can be either
9924  * 100% full or 100% empty. My solution: never fill
9925  * the buffer to 100% */
9926  assert(pheader->write_pointer != pheader->read_pointer);
9927 
9928  /* send wake up messages to all clients that want this event */
9929  int i;
9930  for (i = 0; i < pheader->max_client_index; i++) {
9931  BUFFER_CLIENT *pc = pheader->client + i;
9932  int request_id = bm_find_first_request_locked(pc, pevent);
9933  bm_notify_reader_locked(pheader, pc, old_write_pointer, request_id);
9934  }
9935 
9936 #if 0
9938  if (status != BM_SUCCESS) {
9939  printf("bm_send_event: corrupted 333!\n");
9940  abort();
9941  }
9942 #endif
9943 
9944  /* update statistics */
9945  pheader->num_in_events++;
9946  pbuf->count_sent += 1;
9947  pbuf->bytes_sent += total_size;
9948  }
9949 #endif /* LOCAL_ROUTINES */
9950 
9951  return BM_SUCCESS;
9952 }
double count
Definition: mdump.cxx:36
INT max_event_size
Definition: mfed.cxx:29
#define MAX_WRITE_CACHE_EVENT_SIZE_DIV
Definition: midas.h:266
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_send_event_vec() [1/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< char > &  event,
int  timeout_msec 
)

Definition at line 9672 of file midas.cxx.

9673 {
9674  const char* cptr = event.data();
9675  size_t clen = event.size();
9676  return bm_send_event_sg(buffer_handle, 1, &cptr, &clen, timeout_msec);
9677 }
Here is the call graph for this function:

◆ bm_send_event_vec() [2/2]

int bm_send_event_vec ( int  buffer_handle,
const std::vector< std::vector< char >> &  event,
int  timeout_msec 
)

Definition at line 9679 of file midas.cxx.

9680 {
9681  int sg_n = event.size();
9682  const char* sg_ptr[sg_n];
9683  size_t sg_len[sg_n];
9684  for (int i=0; i<sg_n; i++) {
9685  sg_ptr[i] = event[i].data();
9686  sg_len[i] = event[i].size();
9687  }
9688  return bm_send_event_sg(buffer_handle, sg_n, sg_ptr, sg_len, timeout_msec);
9689 }
Here is the call graph for this function:

◆ bm_set_cache_size()

INT bm_set_cache_size ( INT  buffer_handle,
size_t  read_size,
size_t  write_size 
)

Modifies buffer cache size. Without a buffer cache, events are copied to/from the shared memory event by event.

To protect processed from accessing the shared memory simultaneously, semaphores are used. Since semaphore operations are CPU consuming (typically 50-100us) this can slow down the data transfer especially for small events. By using a cache the number of semaphore operations is reduced dramatically. Instead writing directly to the shared memory, the events are copied to a local cache buffer. When this buffer is full, it is copied to the shared memory in one operation. The same technique can be used when receiving events.

The drawback of this method is that the events have to be copied twice, once to the cache and once from the cache to the shared memory. Therefore it can happen that the usage of a cache even slows down data throughput on a given environment (computer type, OS type, event size). The cache size has therefore be optimized manually to maximize data throughput.

Parameters
buffer_handlebuffer handle obtained via bm_open_buffer()
read_sizecache size for reading events in bytes, zero for no cache
write_sizecache size for writing events in bytes, zero for no cache
Returns
BM_SUCCESS, BM_INVALID_HANDLE, BM_NO_MEMORY, BM_INVALID_PARAM

Definition at line 8105 of file midas.cxx.

8107 {
8108  if (rpc_is_remote())
8109  return rpc_call(RPC_BM_SET_CACHE_SIZE, buffer_handle, read_size, write_size);
8110 
8111 #ifdef LOCAL_ROUTINES
8112  {
8113  int status = 0;
8114 
8115  BUFFER *pbuf = bm_get_buffer("bm_set_cache_size", buffer_handle, &status);
8116 
8117  if (!pbuf)
8118  return status;
8119 
8120  /* lock pbuf for local access. we do not lock buffer semaphore because we do not touch the shared memory */
8121 
8122  status = bm_lock_buffer_mutex(pbuf);
8123 
8124  if (status != BM_SUCCESS)
8125  return status;
8126 
8127  if (write_size < 0)
8128  write_size = 0;
8129 
8130  if (write_size > 0) {
8131  if (write_size < MIN_WRITE_CACHE_SIZE) {
8132  cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" too small, will use minimum size %d", write_size, pbuf->buffer_name, MIN_WRITE_CACHE_SIZE);
8133  write_size = MIN_WRITE_CACHE_SIZE;
8134  }
8135  }
8136 
8137  size_t max_write_size = pbuf->buffer_header->size/MAX_WRITE_CACHE_SIZE_DIV;
8138 
8139  if (write_size > max_write_size) {
8140  size_t new_write_size = max_write_size;
8141  cm_msg(MERROR, "bm_set_cache_size", "requested write cache size %zu on buffer \"%s\" is too big: buffer size is %d, write cache size will be %zu bytes", write_size, pbuf->buffer_name, pbuf->buffer_header->size, new_write_size);
8142  write_size = new_write_size;
8143  }
8144 
8145  pbuf->buffer_mutex.unlock();
8146 
8147  /* resize read cache */
8148 
8150 
8151  if (status != BM_SUCCESS) {
8152  return status;
8153  }
8154 
8155  if (pbuf->read_cache_size > 0) {
8156  free(pbuf->read_cache);
8157  pbuf->read_cache = NULL;
8158  }
8159 
8160  if (read_size > 0) {
8161  pbuf->read_cache = (char *) malloc(read_size);
8162  if (pbuf->read_cache == NULL) {
8163  pbuf->read_cache_size = 0;
8164  pbuf->read_cache_rp = 0;
8165  pbuf->read_cache_wp = 0;
8166  pbuf->read_cache_mutex.unlock();
8167  cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate read cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, read_size);
8168  return BM_NO_MEMORY;
8169  }
8170  }
8171 
8172  pbuf->read_cache_size = read_size;
8173  pbuf->read_cache_rp = 0;
8174  pbuf->read_cache_wp = 0;
8175 
8176  pbuf->read_cache_mutex.unlock();
8177 
8178  /* resize the write cache */
8179 
8181 
8182  if (status != BM_SUCCESS)
8183  return status;
8184 
8185  // FIXME: should flush the write cache!
8186  if (pbuf->write_cache_size && pbuf->write_cache_wp > 0) {
8187  cm_msg(MERROR, "bm_set_cache_size", "buffer \"%s\" lost %zu bytes from the write cache", pbuf->buffer_name, pbuf->write_cache_wp);
8188  }
8189 
8190  /* manage write cache */
8191  if (pbuf->write_cache_size > 0) {
8192  free(pbuf->write_cache);
8193  pbuf->write_cache = NULL;
8194  }
8195 
8196  if (write_size > 0) {
8197  pbuf->write_cache = (char *) M_MALLOC(write_size);
8198  if (pbuf->write_cache == NULL) {
8199  pbuf->write_cache_size = 0;
8200  pbuf->write_cache_rp = 0;
8201  pbuf->write_cache_wp = 0;
8202  pbuf->write_cache_mutex.unlock();
8203  cm_msg(MERROR, "bm_set_cache_size", "not enough memory to allocate write cache for buffer \"%s\", malloc(%zu) failed", pbuf->buffer_name, write_size);
8204  return BM_NO_MEMORY;
8205  }
8206  }
8207 
8208  pbuf->write_cache_size = write_size;
8209  pbuf->write_cache_rp = 0;
8210  pbuf->write_cache_wp = 0;
8211 
8212  pbuf->write_cache_mutex.unlock();
8213  }
8214 #endif /* LOCAL_ROUTINES */
8215 
8216  return BM_SUCCESS;
8217 }
static int bm_lock_buffer_mutex(BUFFER *pbuf)
Definition: midas.cxx:7911
#define RPC_BM_SET_CACHE_SIZE
Definition: mrpc.h:42
#define M_MALLOC(x)
Definition: midas.h:1550
#define MIN_WRITE_CACHE_SIZE
Definition: midas.h:264
#define MAX_WRITE_CACHE_SIZE_DIV
Definition: midas.h:265
std::timed_mutex buffer_mutex
Definition: midas.h:994
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [1/2]

static int bm_skip_event ( BUFFER pbuf)
static

Definition at line 10800 of file midas.cxx.

10801 {
10802  /* clear read cache */
10803  if (pbuf->read_cache_size > 0) {
10804 
10805  int status = bm_lock_buffer_read_cache(pbuf);
10806 
10807  if (status != BM_SUCCESS)
10808  return status;
10809 
10810  pbuf->read_cache_rp = 0;
10811  pbuf->read_cache_wp = 0;
10812 
10813  pbuf->read_cache_mutex.unlock();
10814  }
10815 
10816  bm_lock_buffer_guard pbuf_guard(pbuf);
10817 
10818  if (!pbuf_guard.is_locked())
10819  return pbuf_guard.get_status();
10820 
10821  BUFFER_HEADER *pheader = pbuf->buffer_header;
10822 
10823  /* forward read pointer to global write pointer */
10824  BUFFER_CLIENT *pclient = bm_get_my_client(pbuf, pheader);
10825  pclient->read_pointer = pheader->write_pointer;
10826 
10827  return BM_SUCCESS;
10828 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_skip_event() [2/2]

INT bm_skip_event ( INT  buffer_handle)

Skip all events in current buffer.

Useful for single event displays to see the newest events

Parameters
buffer_handleHandle of the buffer. Must be obtained via bm_open_buffer.
Returns
BM_SUCCESS, BM_INVALID_HANDLE, RPC_NET_ERROR

Definition at line 10841 of file midas.cxx.

10841  {
10842  if (rpc_is_remote())
10843  return rpc_call(RPC_BM_SKIP_EVENT, buffer_handle);
10844 
10845 #ifdef LOCAL_ROUTINES
10846  {
10847  int status = 0;
10848 
10849  BUFFER *pbuf = bm_get_buffer("bm_skip_event", buffer_handle, &status);
10850 
10851  if (!pbuf)
10852  return status;
10853 
10854  return bm_skip_event(pbuf);
10855  }
10856 #endif
10857 
10858  return BM_SUCCESS;
10859 }
#define RPC_BM_SKIP_EVENT
Definition: mrpc.h:50
Here is the call graph for this function:

◆ bm_update_last_activity()

static void bm_update_last_activity ( DWORD  millitime)
static

Update last activity time

Definition at line 6080 of file midas.cxx.

6080  {
6081  int pid = ss_getpid();
6082 
6083  std::vector<BUFFER*> mybuffers;
6084 
6085  gBuffersMutex.lock();
6086  mybuffers = gBuffers;
6087  gBuffersMutex.unlock();
6088 
6089  for (BUFFER* pbuf : mybuffers) {
6090  if (!pbuf)
6091  continue;
6092  if (pbuf->attached) {
6093 
6094  bm_lock_buffer_guard pbuf_guard(pbuf);
6095 
6096  if (!pbuf_guard.is_locked())
6097  continue;
6098 
6099  BUFFER_HEADER *pheader = pbuf->buffer_header;
6100  for (int j = 0; j < pheader->max_client_index; j++) {
6101  BUFFER_CLIENT *pclient = pheader->client + j;
6102  if (pclient->pid == pid) {
6103  pclient->last_activity = millitime;
6104  }
6105  }
6106  }
6107  }
6108 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_update_read_pointer_locked()

static BOOL bm_update_read_pointer_locked ( const char *  caller_name,
BUFFER_HEADER pheader 
)
static

Definition at line 8687 of file midas.cxx.

8687  {
8688  assert(caller_name);
8689 
8690  /* calculate global read pointer as "minimum" of client read pointers */
8691  int min_rp = pheader->write_pointer;
8692 
8693  int i;
8694  for (i = 0; i < pheader->max_client_index; i++) {
8695  BUFFER_CLIENT *pc = pheader->client + i;
8696  if (pc->pid) {
8698 
8699 #if 0
8700  printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, min_rp %d, client [%s] rp %d\n",
8701  pheader->name,
8702  pheader->read_pointer,
8703  pheader->write_pointer,
8704  pheader->size,
8705  min_rp,
8706  pc->name,
8707  pc->read_pointer);
8708 #endif
8709 
8710  if (pheader->read_pointer <= pheader->write_pointer) {
8711  // normal pointers
8712  if (pc->read_pointer < min_rp)
8713  min_rp = pc->read_pointer;
8714  } else {
8715  // inverted pointers
8716  if (pc->read_pointer <= pheader->write_pointer) {
8717  // clients 3 and 4
8718  if (pc->read_pointer < min_rp)
8719  min_rp = pc->read_pointer;
8720  } else {
8721  // clients 1 and 2
8722  int xptr = pc->read_pointer - pheader->size;
8723  if (xptr < min_rp)
8724  min_rp = xptr;
8725  }
8726  }
8727  }
8728  }
8729 
8730  if (min_rp < 0)
8731  min_rp += pheader->size;
8732 
8733  assert(min_rp >= 0);
8734  assert(min_rp < pheader->size);
8735 
8736  if (min_rp == pheader->read_pointer) {
8737  return FALSE;
8738  }
8739 
8740 #if 0
8741  printf("bm_update_read_pointer: [%s] rp %d, wp %d, size %d, new_rp %d, moved\n",
8742  pheader->name,
8743  pheader->read_pointer,
8744  pheader->write_pointer,
8745  pheader->size,
8746  min_rp);
8747 #endif
8748 
8749  pheader->read_pointer = min_rp;
8750 
8751  return TRUE;
8752 }
static void bm_validate_client_pointers_locked(const BUFFER_HEADER *pheader, BUFFER_CLIENT *pclient)
Definition: midas.cxx:8589
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_buffer_locked()

static int bm_validate_buffer_locked ( const BUFFER pbuf)
static

Definition at line 6271 of file midas.cxx.

6271  {
6272  const BUFFER_HEADER *pheader = pbuf->buffer_header;
6273  const char *pdata = (const char *) (pheader + 1);
6274 
6275  //printf("bm_validate_buffer: buffer \"%s\"\n", pheader->name);
6276 
6277  //printf("size: %d, rp: %d, wp: %d\n", pheader->size, pheader->read_pointer, pheader->write_pointer);
6278 
6279  //printf("clients: max: %d, num: %d, MAX_CLIENTS: %d\n", pheader->max_client_index, pheader->num_clients, MAX_CLIENTS);
6280 
6281  if (pheader->read_pointer < 0 || pheader->read_pointer >= pheader->size) {
6282  cm_msg(MERROR, "bm_validate_buffer",
6283  "buffer \"%s\" is corrupted: invalid read pointer %d. Size %d, write pointer %d", pheader->name,
6284  pheader->read_pointer, pheader->size, pheader->write_pointer);
6285  return BM_CORRUPTED;
6286  }
6287 
6288  if (pheader->write_pointer < 0 || pheader->write_pointer >= pheader->size) {
6289  cm_msg(MERROR, "bm_validate_buffer",
6290  "buffer \"%s\" is corrupted: invalid write pointer %d. Size %d, read pointer %d", pheader->name,
6291  pheader->write_pointer, pheader->size, pheader->read_pointer);
6292  return BM_CORRUPTED;
6293  }
6294 
6295  if (!bm_validate_rp("bm_validate_buffer_locked", pheader, pheader->read_pointer)) {
6296  cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: read pointer %d is invalid", pheader->name,
6297  pheader->read_pointer);
6298  return BM_CORRUPTED;
6299  }
6300 
6301  int rp = pheader->read_pointer;
6302  int rp0 = -1;
6303  while (rp != pheader->write_pointer) {
6304  if (!bm_validate_rp("bm_validate_buffer_locked", pheader, rp)) {
6305  cm_msg(MERROR, "bm_validate_buffer", "buffer \"%s\" is corrupted: invalid rp %d, last good event at rp %d",
6306  pheader->name, rp, rp0);
6307  return BM_CORRUPTED;
6308  }
6309  //bm_print_event(pdata, rp);
6310  int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6311  if (rp1 < 0) {
6312  cm_msg(MERROR, "bm_validate_buffer",
6313  "buffer \"%s\" is corrupted: invalid event at rp %d, last good event at rp %d", pheader->name, rp, rp0);
6314  return BM_CORRUPTED;
6315  }
6316  rp0 = rp;
6317  rp = rp1;
6318  }
6319 
6320  int i;
6321  for (i = 0; i < MAX_CLIENTS; i++) {
6322  const BUFFER_CLIENT *c = &pheader->client[i];
6323  if (c->pid == 0)
6324  continue;
6325  BOOL get_all = FALSE;
6326  int j;
6327  for (j = 0; j < MAX_EVENT_REQUESTS; j++) {
6328  const EVENT_REQUEST *r = &c->event_request[j];
6329  if (!r->valid)
6330  continue;
6331  BOOL xget_all = r->sampling_type == GET_ALL;
6332  get_all = (get_all || xget_all);
6333  //printf("client slot %d: pid %d, name \"%s\", request %d: id %d, valid %d, sampling_type %d, get_all %d\n", i, c->pid, c->name, j, r->id, r->valid, r->sampling_type, xget_all);
6334  }
6335 
6336  int rp = c->read_pointer;
6337  int rp0 = -1;
6338  while (rp != pheader->write_pointer) {
6339  //bm_print_event(pdata, rp);
6340  int rp1 = bm_next_rp("bm_validate_buffer_locked", pheader, pdata, rp);
6341  if (rp1 < 0) {
6342  cm_msg(MERROR, "bm_validate_buffer",
6343  "buffer \"%s\" is corrupted for client \"%s\" rp %d: invalid event at rp %d, last good event at rp %d",
6344  pheader->name, c->name, c->read_pointer, rp, rp0);
6345  return BM_CORRUPTED;
6346  }
6347  rp0 = rp;
6348  rp = rp1;
6349  }
6350  }
6351 
6352  return BM_SUCCESS;
6353 }
static BOOL bm_validate_rp(const char *who, const BUFFER_HEADER *pheader, int rp)
Definition: midas.cxx:6153
static int bm_next_rp(const char *who, const BUFFER_HEADER *pheader, const char *pdata, int rp)
Definition: midas.cxx:6220
char c
Definition: system.cxx:1316
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_client_pointers_locked()

static void bm_validate_client_pointers_locked ( const BUFFER_HEADER pheader,
BUFFER_CLIENT pclient 
)
static

Definition at line 8589 of file midas.cxx.

8589  {
8590  assert(pheader->read_pointer >= 0 && pheader->read_pointer <= pheader->size);
8591  assert(pclient->read_pointer >= 0 && pclient->read_pointer <= pheader->size);
8592 
8593  if (pheader->read_pointer <= pheader->write_pointer) {
8594 
8595  if (pclient->read_pointer < pheader->read_pointer) {
8596  cm_msg(MINFO, "bm_validate_client_pointers",
8597  "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8598  pclient->name,
8599  pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8600 
8601  pclient->read_pointer = pheader->read_pointer;
8602  }
8603 
8604  if (pclient->read_pointer > pheader->write_pointer) {
8605  cm_msg(MINFO, "bm_validate_client_pointers",
8606  "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, read pointer %d, size %d",
8607  pclient->name,
8608  pheader->name, pclient->read_pointer, pheader->write_pointer, pheader->read_pointer, pheader->size);
8609 
8610  pclient->read_pointer = pheader->write_pointer;
8611  }
8612 
8613  } else {
8614 
8615  if (pclient->read_pointer < 0) {
8616  cm_msg(MINFO, "bm_validate_client_pointers",
8617  "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8618  pclient->name,
8619  pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8620 
8621  pclient->read_pointer = pheader->read_pointer;
8622  }
8623 
8624  if (pclient->read_pointer >= pheader->size) {
8625  cm_msg(MINFO, "bm_validate_client_pointers",
8626  "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8627  pclient->name,
8628  pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8629 
8630  pclient->read_pointer = pheader->read_pointer;
8631  }
8632 
8633  if (pclient->read_pointer > pheader->write_pointer && pclient->read_pointer < pheader->read_pointer) {
8634  cm_msg(MINFO, "bm_validate_client_pointers",
8635  "Corrected read pointer for client \'%s\' on buffer \'%s\' from %d to %d, write pointer %d, size %d",
8636  pclient->name,
8637  pheader->name, pclient->read_pointer, pheader->read_pointer, pheader->write_pointer, pheader->size);
8638 
8639  pclient->read_pointer = pheader->read_pointer;
8640  }
8641  }
8642 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_validate_rp()

static BOOL bm_validate_rp ( const char *  who,
const BUFFER_HEADER pheader,
int  rp 
)
static

Definition at line 6153 of file midas.cxx.

6153  {
6154  if (rp < 0 || rp > pheader->size) {
6155  cm_msg(MERROR, "bm_validate_rp",
6156  "error: buffer \"%s\" is corrupted: rp %d is invalid. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6157  pheader->name,
6158  rp,
6159  pheader->read_pointer,
6160  pheader->write_pointer,
6161  pheader->size,
6162  who);
6163  return FALSE;
6164  }
6165 
6166  if ((rp + (int) sizeof(EVENT_HEADER)) > pheader->size) {
6167  // note ">" here, has to match bm_incr_rp() and bm_write_to_buffer()
6168  cm_msg(MERROR, "bm_validate_rp",
6169  "error: buffer \"%s\" is corrupted: rp %d plus event header point beyond the end of buffer by %d bytes. buffer read_pointer %d, write_pointer %d, size %d, called from %s",
6170  pheader->name,
6171  rp,
6172  (int) (rp + sizeof(EVENT_HEADER) - pheader->size),
6173  pheader->read_pointer,
6174  pheader->write_pointer,
6175  pheader->size,
6176  who);
6177  return FALSE;
6178  }
6179 
6180  return TRUE;
6181 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_free_space_locked()

static int bm_wait_for_free_space_locked ( bm_lock_buffer_guard pbuf_guard,
int  timeout_msec,
int  requested_space,
bool  unlock_write_cache 
)
static
  • signal other clients wait mode *‍/
  • validate client index: we could have been removed from the buffer *‍/

Definition at line 9047 of file midas.cxx.

9048 {
9049  // return values:
9050  // BM_SUCCESS - have "requested_space" bytes free in the buffer
9051  // BM_CORRUPTED - shared memory is corrupted
9052  // BM_NO_MEMORY - asked for more than buffer size
9053  // BM_ASYNC_RETURN - timeout waiting for free space
9054  // BM_INVALID_HANDLE - buffer was closed (locks released) (via bm_clock_xxx())
9055  // SS_ABORT - we are told to shutdown (locks releases)
9056 
9057  int status;
9058  BUFFER* pbuf = pbuf_guard.get_pbuf();
9059  BUFFER_HEADER *pheader = pbuf->buffer_header;
9060  char *pdata = (char *) (pheader + 1);
9061 
9062  /* make sure the buffer never completely full:
9063  * read pointer and write pointer would coincide
9064  * and the code cannot tell if it means the
9065  * buffer is 100% full or 100% empty. It will explode
9066  * or lose events */
9067  requested_space += 100;
9068 
9069  if (requested_space >= pheader->size)
9070  return BM_NO_MEMORY;
9071 
9072  DWORD time_start = ss_millitime();
9073  DWORD time_end = time_start + timeout_msec;
9074 
9075  //DWORD blocking_time = 0;
9076  //int blocking_loops = 0;
9077  int blocking_client_index = -1;
9078  char blocking_client_name[NAME_LENGTH];
9079  blocking_client_name[0] = 0;
9080 
9081  while (1) {
9082  while (1) {
9083  /* check if enough space in buffer */
9084 
9085  int free = pheader->read_pointer - pheader->write_pointer;
9086  if (free <= 0)
9087  free += pheader->size;
9088 
9089  //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9090 
9091  if (requested_space < free) { /* note the '<' to avoid 100% filling */
9092  //if (blocking_loops) {
9093  // DWORD wait_time = ss_millitime() - blocking_time;
9094  // printf("blocking client \"%s\", time %d ms, loops %d\n", blocking_client_name, wait_time, blocking_loops);
9095  //}
9096 
9097  if (pbuf->wait_start_time != 0) {
9098  DWORD now = ss_millitime();
9099  DWORD wait_time = now - pbuf->wait_start_time;
9100  pbuf->time_write_wait += wait_time;
9101  pbuf->wait_start_time = 0;
9102  int iclient = pbuf->wait_client_index;
9103  //printf("bm_wait_for_free_space: wait ended: wait time %d ms, blocking client index %d\n", wait_time, iclient);
9104  if (iclient >= 0 && iclient < MAX_CLIENTS) {
9105  pbuf->client_count_write_wait[iclient] += 1;
9106  pbuf->client_time_write_wait[iclient] += wait_time;
9107  }
9108  }
9109 
9110  //if (blocking_loops > 0) {
9111  // printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, found space after %d waits\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec, blocking_loops);
9112  //}
9113 
9114  return BM_SUCCESS;
9115  }
9116 
9117  if (!bm_validate_rp("bm_wait_for_free_space_locked", pheader, pheader->read_pointer)) {
9118  cm_msg(MERROR, "bm_wait_for_free_space",
9119  "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer is invalid",
9120  pheader->name,
9121  pheader->read_pointer,
9122  pheader->write_pointer,
9123  pheader->size,
9124  free,
9125  requested_space);
9126  return BM_CORRUPTED;
9127  }
9128 
9129  const EVENT_HEADER *pevent = (const EVENT_HEADER *) (pdata + pheader->read_pointer);
9130  int event_size = pevent->data_size + sizeof(EVENT_HEADER);
9131  int total_size = ALIGN8(event_size);
9132 
9133 #if 0
9134  printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, blocking event size %d/%d\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, event_size, total_size);
9135 #endif
9136 
9137  if (pevent->data_size <= 0 || total_size <= 0 || total_size > pheader->size) {
9138  cm_msg(MERROR, "bm_wait_for_free_space",
9139  "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer points to an invalid event: data_size %d, event size %d, total_size %d",
9140  pheader->name,
9141  pheader->read_pointer,
9142  pheader->write_pointer,
9143  pheader->size,
9144  free,
9145  requested_space,
9146  pevent->data_size,
9147  event_size,
9148  total_size);
9149  return BM_CORRUPTED;
9150  }
9151 
9152  int blocking_client = -1;
9153 
9154  int i;
9155  for (i = 0; i < pheader->max_client_index; i++) {
9156  BUFFER_CLIENT *pc = pheader->client + i;
9157  if (pc->pid) {
9158  if (pc->read_pointer == pheader->read_pointer) {
9159  /*
9160  First assume that the client with the "minimum" read pointer
9161  is not really blocking due to a GET_ALL request.
9162  */
9163  BOOL blocking = FALSE;
9164  //int blocking_request_id = -1;
9165 
9166  int j;
9167  for (j = 0; j < pc->max_request_index; j++) {
9168  const EVENT_REQUEST *prequest = pc->event_request + j;
9169  if (prequest->valid
9170  && bm_match_event(prequest->event_id, prequest->trigger_mask, pevent)) {
9171  if (prequest->sampling_type & GET_ALL) {
9172  blocking = TRUE;
9173  //blocking_request_id = prequest->id;
9174  break;
9175  }
9176  }
9177  }
9178 
9179  //printf("client [%s] blocking %d, request %d\n", pc->name, blocking, blocking_request_id);
9180 
9181  if (blocking) {
9182  blocking_client = i;
9183  break;
9184  }
9185 
9186  pc->read_pointer = bm_incr_rp_no_check(pheader, pc->read_pointer, total_size);
9187  }
9188  }
9189  } /* client loop */
9190 
9191  if (blocking_client >= 0) {
9192  blocking_client_index = blocking_client;
9193  strlcpy(blocking_client_name, pheader->client[blocking_client].name, sizeof(blocking_client_name));
9194  //if (!blocking_time) {
9195  // blocking_time = ss_millitime();
9196  //}
9197 
9198  //printf("bm_wait_for_free_space: buffer pointers: read: %d, write: %d, free space: %d, bufsize: %d, event size: %d, timeout %d, must wait for more space!\n", pheader->read_pointer, pheader->write_pointer, free, pheader->size, requested_space, timeout_msec);
9199 
9200  // from this "break" we go into timeout check and sleep/wait.
9201  break;
9202  }
9203 
9204  /* no blocking clients. move the read pointer and again check for free space */
9205 
9206  BOOL moved = bm_update_read_pointer_locked("bm_wait_for_free_space", pheader);
9207 
9208  if (!moved) {
9209  cm_msg(MERROR, "bm_wait_for_free_space",
9210  "error: buffer \"%s\" is corrupted: read_pointer %d, write_pointer %d, size %d, free %d, waiting for %d bytes: read pointer did not move as expected",
9211  pheader->name,
9212  pheader->read_pointer,
9213  pheader->write_pointer,
9214  pheader->size,
9215  free,
9216  requested_space);
9217  return BM_CORRUPTED;
9218  }
9219 
9220  /* we freed one event, loop back to the check for free space */
9221  }
9222 
9223  //blocking_loops++;
9224 
9225  /* at least one client is blocking */
9226 
9227  BUFFER_CLIENT *pc = bm_get_my_client(pbuf, pheader);
9228  pc->write_wait = requested_space;
9229 
9230  if (pbuf->wait_start_time == 0) {
9231  pbuf->wait_start_time = ss_millitime();
9232  pbuf->count_write_wait++;
9233  if (requested_space > pbuf->max_requested_space)
9234  pbuf->max_requested_space = requested_space;
9235  pbuf->wait_client_index = blocking_client_index;
9236  }
9237 
9238  DWORD now = ss_millitime();
9239 
9240  //printf("bm_wait_for_free_space: start 0x%08x, now 0x%08x, end 0x%08x, timeout %d, wait %d\n", time_start, now, time_end, timeout_msec, time_end - now);
9241 
9242  int sleep_time_msec = 1000;
9243 
9244  if (timeout_msec == BM_WAIT) {
9245  // wait forever
9246  } else if (timeout_msec == BM_NO_WAIT) {
9247  // no wait
9248  return BM_ASYNC_RETURN;
9249  } else {
9250  // check timeout
9251  if (now >= time_end) {
9252  // timeout!
9253  return BM_ASYNC_RETURN;
9254  }
9255 
9256  sleep_time_msec = time_end - now;
9257 
9258  if (sleep_time_msec <= 0) {
9259  sleep_time_msec = 10;
9260  } else if (sleep_time_msec > 1000) {
9261  sleep_time_msec = 1000;
9262  }
9263  }
9264 
9266 
9267  /* before waiting, unlock everything in the correct order */
9268 
9269  pbuf_guard.unlock();
9270 
9271  if (unlock_write_cache)
9272  pbuf->write_cache_mutex.unlock();
9273 
9274  //printf("bm_wait_for_free_space: blocking client \"%s\"\n", blocking_client_name);
9275 
9276 #ifdef DEBUG_MSG
9277  cm_msg(MDEBUG, "Send sleep: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9278 #endif
9279 
9281  //int idx = bm_validate_client_index(pbuf, FALSE);
9282  //if (idx >= 0)
9283  // pheader->client[idx].write_wait = requested_space;
9284 
9285  //bm_cleanup("bm_wait_for_free_space", ss_millitime(), FALSE);
9286 
9287  status = ss_suspend(sleep_time_msec, MSG_BM);
9288 
9289  /* we are told to shutdown */
9290  if (status == SS_ABORT) {
9291  // NB: buffer is locked!
9292  return SS_ABORT;
9293  }
9294 
9295  /* make sure we do sleep in this loop:
9296  * if we are the mserver receiving data on the event
9297  * socket and the data buffer is full, ss_suspend() will
9298  * never sleep: it will detect data on the event channel,
9299  * call rpc_server_receive() (recursively, we already *are* in
9300  * rpc_server_receive()) and return without sleeping. Result
9301  * is a busy loop waiting for free space in data buffer */
9302 
9303  /* update May 2021: ss_suspend(MSG_BM) no longer looks at
9304  * the event socket, and should sleep now, so this sleep below
9305  * maybe is not needed now. but for safety, I keep it. K.O. */
9306 
9307  if (status != SS_TIMEOUT) {
9308  //printf("ss_suspend: status %d\n", status);
9309  ss_sleep(1);
9310  }
9311 
9312  /* we may be stuck in this loop for an arbitrary long time,
9313  * depending on how other buffer clients read the accumulated data
9314  * so we should update all the timeouts & etc. K.O. */
9315 
9317 
9318  /* lock things again in the correct order */
9319 
9320  if (unlock_write_cache) {
9322 
9323  if (status != BM_SUCCESS) {
9324  // bail out with all locks released
9325  return status;
9326  }
9327  }
9328 
9329  if (!pbuf_guard.relock()) {
9330  if (unlock_write_cache) {
9331  pbuf->write_cache_mutex.unlock();
9332  }
9333 
9334  // bail out with all locks released
9335  return pbuf_guard.get_status();
9336  }
9337 
9338  /* revalidate the client index: we could have been removed from the buffer while sleeping */
9339  pc = bm_get_my_client(pbuf, pheader);
9340 
9341  pc->write_wait = 0;
9342 
9344  //idx = bm_validate_client_index(pbuf, FALSE);
9345  //if (idx >= 0)
9346  // pheader->client[idx].write_wait = 0;
9347  //else {
9348  // cm_msg(MERROR, "bm_wait_for_free_space", "our client index is no longer valid, exiting...");
9349  // status = SS_ABORT;
9350  //}
9351 
9352 #ifdef DEBUG_MSG
9353  cm_msg(MDEBUG, "Send woke up: rp=%d, wp=%d, level=%1.1lf", pheader->read_pointer, pheader->write_pointer, 100 - 100.0 * size / pheader->size);
9354 #endif
9355 
9356  }
9357 }
int get_status() const
Definition: midas.cxx:3174
static BOOL bm_update_read_pointer_locked(const char *caller_name, BUFFER_HEADER *pheader)
Definition: midas.cxx:8687
INT cm_periodic_tasks()
Definition: midas.cxx:5575
#define SS_TIMEOUT
Definition: midas.h:680
#define MDEBUG
Definition: midas.h:567
#define MSG_BM
Definition: msystem.h:295
INT ss_suspend(INT millisec, INT msg)
Definition: system.cxx:4482
INT ss_sleep(INT millisec)
Definition: system.cxx:3567
int max_requested_space
Definition: midas.h:1025
int count_write_wait
Definition: midas.h:1020
int wait_client_index
Definition: midas.h:1024
DWORD time_write_wait
Definition: midas.h:1021
DWORD wait_start_time
Definition: midas.h:1023
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wait_for_more_events_locked()

static int bm_wait_for_more_events_locked ( bm_lock_buffer_guard pbuf_guard,
BUFFER_CLIENT pc,
int  timeout_msec,
BOOL  unlock_read_cache 
)
static

Definition at line 9359 of file midas.cxx.

9360 {
9361  BUFFER* pbuf = pbuf_guard.get_pbuf();
9362  BUFFER_HEADER* pheader = pbuf->buffer_header;
9363 
9364  //printf("bm_wait_for_more_events_locked: [%s] timeout %d\n", pheader->name, timeout_msec);
9365 
9366  if (pc->read_pointer != pheader->write_pointer) {
9367  // buffer has data
9368  return BM_SUCCESS;
9369  }
9370 
9371  if (timeout_msec == BM_NO_WAIT) {
9372  /* event buffer is empty and we are told to not wait */
9373  if (!pc->read_wait) {
9374  //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait in BM_NO_WAIT!\n", pheader->name, pc->name);
9375  pc->read_wait = TRUE;
9376  }
9377  return BM_ASYNC_RETURN;
9378  }
9379 
9380  DWORD time_start = ss_millitime();
9381  DWORD time_wait = time_start + timeout_msec;
9382  DWORD sleep_time = 1000;
9383  if (timeout_msec == BM_NO_WAIT) {
9384  // default sleep time
9385  } else if (timeout_msec == BM_WAIT) {
9386  // default sleep time
9387  } else {
9388  if (sleep_time > (DWORD)timeout_msec)
9389  sleep_time = timeout_msec;
9390  }
9391 
9392  //printf("time start 0x%08x, end 0x%08x, sleep %d\n", time_start, time_wait, sleep_time);
9393 
9394  while (pc->read_pointer == pheader->write_pointer) {
9395  /* wait until there is data in the buffer (write pointer moves) */
9396 
9397  if (!pc->read_wait) {
9398  //printf("bm_wait_for_more_events: buffer [%s] client [%s] set read_wait!\n", pheader->name, pc->name);
9399  pc->read_wait = TRUE;
9400  }
9401 
9402  pc->last_activity = ss_millitime();
9403 
9405 
9406  // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9407 
9408  pbuf_guard.unlock();
9409 
9410  if (unlock_read_cache)
9411  pbuf->read_cache_mutex.unlock();
9412 
9413  int status = ss_suspend(sleep_time, MSG_BM);
9414 
9415  if (timeout_msec == BM_NO_WAIT) {
9416  // return immediately
9417  } else if (timeout_msec == BM_WAIT) {
9418  // wait forever
9419  } else {
9420  DWORD now = ss_millitime();
9421  //printf("check timeout: now 0x%08x, end 0x%08x, diff %d\n", now, time_wait, time_wait - now);
9422  if (now >= time_wait) {
9423  timeout_msec = BM_NO_WAIT; // cause immediate return
9424  } else {
9425  sleep_time = time_wait - now;
9426  if (sleep_time > 1000)
9427  sleep_time = 1000;
9428  //printf("time start 0x%08x, now 0x%08x, end 0x%08x, sleep %d\n", time_start, now, time_wait, sleep_time);
9429  }
9430  }
9431 
9432  // NB: locking order is: 1st read cache lock, 2nd buffer lock, unlock in reverse order
9433 
9434  if (unlock_read_cache) {
9436  if (status != BM_SUCCESS) {
9437  // bail out with all locks released
9438  return status;
9439  }
9440  }
9441 
9442  if (!pbuf_guard.relock()) {
9443  if (unlock_read_cache) {
9444  pbuf->read_cache_mutex.unlock();
9445  }
9446  // bail out with all locks released
9447  return pbuf_guard.get_status();
9448  }
9449 
9450  /* need to revalidate our BUFFER_CLIENT after releasing the buffer lock
9451  * because we may have been removed from the buffer by bm_cleanup() & co
9452  * due to a timeout or whatever. */
9453  pc = bm_get_my_client(pbuf, pheader);
9454 
9455  /* return if TCP connection broken */
9456  if (status == SS_ABORT)
9457  return SS_ABORT;
9458 
9459  if (timeout_msec == BM_NO_WAIT)
9460  return BM_ASYNC_RETURN;
9461  }
9462 
9463  if (pc->read_wait) {
9464  //printf("bm_wait_for_more_events: buffer [%s] client [%s] clear read_wait!\n", pheader->name, pc->name);
9465  pc->read_wait = FALSE;
9466  }
9467 
9468  return BM_SUCCESS;
9469 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_wakeup_producers_locked()

static void bm_wakeup_producers_locked ( const BUFFER_HEADER pheader,
const BUFFER_CLIENT pc 
)
static

Definition at line 8754 of file midas.cxx.

8754  {
8755  int i;
8756  int have_get_all_requests = 0;
8757 
8758  for (i = 0; i < pc->max_request_index; i++)
8759  if (pc->event_request[i].valid)
8760  have_get_all_requests |= (pc->event_request[i].sampling_type == GET_ALL);
8761 
8762  /* only GET_ALL requests actually free space in the event buffer */
8763  if (!have_get_all_requests)
8764  return;
8765 
8766  /*
8767  If read pointer has been changed, it may have freed up some space
8768  for waiting producers. So check if free space is now more than 50%
8769  of the buffer size and wake waiting producers.
8770  */
8771 
8772  int free_space = pc->read_pointer - pheader->write_pointer;
8773  if (free_space <= 0)
8774  free_space += pheader->size;
8775 
8776  if (free_space >= pheader->size * 0.5) {
8777  for (i = 0; i < pheader->max_client_index; i++) {
8778  const BUFFER_CLIENT *pc = pheader->client + i;
8779  if (pc->pid && pc->write_wait) {
8780  BOOL send_wakeup = (pc->write_wait < free_space);
8781  //printf("bm_wakeup_producers: buffer [%s] client [%s] write_wait %d, free_space %d, sending wakeup message %d\n", pheader->name, pc->name, pc->write_wait, free_space, send_wakeup);
8782  if (send_wakeup) {
8783  ss_resume(pc->port, "B ");
8784  }
8785  }
8786  }
8787  }
8788 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb()

static void bm_write_buffer_statistics_to_odb ( HNDLE  hDB,
BUFFER pbuf,
BOOL  force 
)
static

Definition at line 6550 of file midas.cxx.

6551 {
6552  //printf("bm_buffer_write_statistics_to_odb: buffer [%s] client [%s], lock count %d -> %d, force %d\n", pbuf->buffer_name, pbuf->client_name, pbuf->last_count_lock, pbuf->count_lock, force);
6553 
6554  bm_lock_buffer_guard pbuf_guard(pbuf);
6555 
6556  if (!pbuf_guard.is_locked())
6557  return;
6558 
6559  if (!force) {
6560  if (pbuf->count_lock == pbuf->last_count_lock) {
6561  return;
6562  }
6563  }
6564 
6565  std::string buffer_name = pbuf->buffer_name;
6566  std::string client_name = pbuf->client_name;
6567 
6568  if ((strlen(buffer_name.c_str()) < 1) || (strlen(client_name.c_str()) < 1)) {
6569  // do not call cm_msg() while holding buffer lock, if we are SYSMSG, we will deadlock. K.O.
6570  pbuf_guard.unlock(); // unlock before cm_msg()
6571  cm_msg(MERROR, "bm_write_buffer_statistics_to_odb", "Invalid empty buffer name \"%s\" or client name \"%s\"", buffer_name.c_str(), client_name.c_str());
6572  return;
6573  }
6574 
6575  pbuf->last_count_lock = pbuf->count_lock;
6576 
6577  BUFFER_INFO xbuf(pbuf);
6578  BUFFER_HEADER xheader = *pbuf->buffer_header;
6579  int client_index = pbuf->client_index;
6580 
6581  pbuf_guard.unlock();
6582 
6583  bm_write_buffer_statistics_to_odb_copy(hDB, buffer_name.c_str(), client_name.c_str(), client_index, &xbuf, &xheader);
6584 }
static void bm_write_buffer_statistics_to_odb_copy(HNDLE hDB, const char *buffer_name, const char *client_name, int client_index, BUFFER_INFO *pbuf, BUFFER_HEADER *pheader)
Definition: midas.cxx:6428
int last_count_lock
Definition: midas.h:1022
int count_lock
Definition: midas.h:1017
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_buffer_statistics_to_odb_copy()

static void bm_write_buffer_statistics_to_odb_copy ( HNDLE  hDB,
const char *  buffer_name,
const char *  client_name,
int  client_index,
BUFFER_INFO pbuf,
BUFFER_HEADER pheader 
)
static

Definition at line 6428 of file midas.cxx.

6429 {
6430  int status;
6431 
6432  DWORD now = ss_millitime();
6433 
6434  HNDLE hKey;
6435  status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6436  if (status != DB_SUCCESS) {
6437  db_create_key(hDB, 0, "/System/Buffers", TID_KEY);
6438  status = db_find_key(hDB, 0, "/System/Buffers", &hKey);
6439  if (status != DB_SUCCESS)
6440  return;
6441  }
6442 
6443  HNDLE hKeyBuffer;
6444  status = db_find_key(hDB, hKey, buffer_name, &hKeyBuffer);
6445  if (status != DB_SUCCESS) {
6447  status = db_find_key(hDB, hKey, buffer_name, &hKeyBuffer);
6448  if (status != DB_SUCCESS)
6449  return;
6450  }
6451 
6452  double buf_size = pheader->size;
6453  double buf_rptr = pheader->read_pointer;
6454  double buf_wptr = pheader->write_pointer;
6455 
6456  double buf_fill = 0;
6457  double buf_cptr = 0;
6458  double buf_cused = 0;
6459  double buf_cused_pct = 0;
6460 
6461  if (client_index >= 0 && client_index <= pheader->max_client_index) {
6462  buf_cptr = pheader->client[client_index].read_pointer;
6463 
6464  if (buf_wptr == buf_cptr) {
6465  buf_cused = 0;
6466  } else if (buf_wptr > buf_cptr) {
6467  buf_cused = buf_wptr - buf_cptr;
6468  } else {
6469  buf_cused = (buf_size - buf_cptr) + buf_wptr;
6470  }
6471 
6472  buf_cused_pct = buf_cused / buf_size * 100.0;
6473 
6474  // we cannot write buf_cused and buf_cused_pct into the buffer statistics
6475  // because some other GET_ALL client may have different buf_cused & etc,
6476  // so they must be written into the per-client statistics
6477  // and the web page should look at all the GET_ALL clients and used
6478  // the biggest buf_cused as the whole-buffer "bytes used" value.
6479  }
6480 
6481  if (buf_wptr == buf_rptr) {
6482  buf_fill = 0;
6483  } else if (buf_wptr > buf_rptr) {
6484  buf_fill = buf_wptr - buf_rptr;
6485  } else {
6486  buf_fill = (buf_size - buf_rptr) + buf_wptr;
6487  }
6488 
6489  double buf_fill_pct = buf_fill / buf_size * 100.0;
6490 
6491  db_set_value(hDB, hKeyBuffer, "Size", &buf_size, sizeof(double), 1, TID_DOUBLE);
6492  db_set_value(hDB, hKeyBuffer, "Write pointer", &buf_wptr, sizeof(double), 1, TID_DOUBLE);
6493  db_set_value(hDB, hKeyBuffer, "Read pointer", &buf_rptr, sizeof(double), 1, TID_DOUBLE);
6494  db_set_value(hDB, hKeyBuffer, "Filled", &buf_fill, sizeof(double), 1, TID_DOUBLE);
6495  db_set_value(hDB, hKeyBuffer, "Filled pct", &buf_fill_pct, sizeof(double), 1, TID_DOUBLE);
6496 
6497  status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6498  if (status != DB_SUCCESS) {
6499  db_create_key(hDB, hKeyBuffer, "Clients", TID_KEY);
6500  status = db_find_key(hDB, hKeyBuffer, "Clients", &hKey);
6501  if (status != DB_SUCCESS)
6502  return;
6503  }
6504 
6505  HNDLE hKeyClient;
6506  status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6507  if (status != DB_SUCCESS) {
6508  db_create_key(hDB, hKey, client_name, TID_KEY);
6509  status = db_find_key(hDB, hKey, client_name, &hKeyClient);
6510  if (status != DB_SUCCESS)
6511  return;
6512  }
6513 
6514  db_set_value(hDB, hKeyClient, "count_lock", &pbuf->count_lock, sizeof(int), 1, TID_INT32);
6515  db_set_value(hDB, hKeyClient, "count_sent", &pbuf->count_sent, sizeof(int), 1, TID_INT32);
6516  db_set_value(hDB, hKeyClient, "bytes_sent", &pbuf->bytes_sent, sizeof(double), 1, TID_DOUBLE);
6517  db_set_value(hDB, hKeyClient, "count_write_wait", &pbuf->count_write_wait, sizeof(int), 1, TID_INT32);
6518  db_set_value(hDB, hKeyClient, "time_write_wait", &pbuf->time_write_wait, sizeof(DWORD), 1, TID_UINT32);
6519  db_set_value(hDB, hKeyClient, "max_bytes_write_wait", &pbuf->max_requested_space, sizeof(INT), 1, TID_INT32);
6520  db_set_value(hDB, hKeyClient, "count_read", &pbuf->count_read, sizeof(int), 1, TID_INT32);
6521  db_set_value(hDB, hKeyClient, "bytes_read", &pbuf->bytes_read, sizeof(double), 1, TID_DOUBLE);
6522  db_set_value(hDB, hKeyClient, "get_all_flag", &pbuf->get_all_flag, sizeof(BOOL), 1, TID_BOOL);
6523  db_set_value(hDB, hKeyClient, "read_pointer", &buf_cptr, sizeof(double), 1, TID_DOUBLE);
6524  db_set_value(hDB, hKeyClient, "bytes_used", &buf_cused, sizeof(double), 1, TID_DOUBLE);
6525  db_set_value(hDB, hKeyClient, "pct_used", &buf_cused_pct, sizeof(double), 1, TID_DOUBLE);
6526 
6527  for (int i = 0; i < MAX_CLIENTS; i++) {
6528  if (!pbuf->client_count_write_wait[i])
6529  continue;
6530 
6531  if (pheader->client[i].pid == 0)
6532  continue;
6533 
6534  if (pheader->client[i].name[0] == 0)
6535  continue;
6536 
6537  char str[100 + NAME_LENGTH];
6538 
6539  sprintf(str, "writes_blocked_by/%s/count_write_wait", pheader->client[i].name);
6540  db_set_value(hDB, hKeyClient, str, &pbuf->client_count_write_wait[i], sizeof(int), 1, TID_INT32);
6541 
6542  sprintf(str, "writes_blocked_by/%s/time_write_wait", pheader->client[i].name);
6543  db_set_value(hDB, hKeyClient, str, &pbuf->client_time_write_wait[i], sizeof(DWORD), 1, TID_UINT32);
6544  }
6545 
6546  db_set_value(hDB, hKeyBuffer, "Last updated", &now, sizeof(DWORD), 1, TID_UINT32);
6547  db_set_value(hDB, hKeyClient, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32);
6548 }
#define TID_DOUBLE
Definition: midas.h:350
#define TID_KEY
Definition: midas.h:356
#define TID_BOOL
Definition: midas.h:347
#define TID_INT32
Definition: midas.h:346
INT db_create_key(HNDLE hDB, HNDLE hKey, const char *key_name, DWORD type)
Definition: odb.cxx:3298
INT db_set_value(HNDLE hDB, HNDLE hKeyRoot, const char *key_name, const void *data, INT data_size, INT num_values, DWORD type)
Definition: odb.cxx:5251
int count_sent
Definition: midas.cxx:6391
BOOL get_all_flag
Definition: midas.cxx:6387
int count_lock
Definition: midas.cxx:6390
int count_write_wait
Definition: midas.cxx:6393
double bytes_read
Definition: midas.cxx:6400
int client_count_write_wait[MAX_CLIENTS]
Definition: midas.cxx:6401
DWORD time_write_wait
Definition: midas.cxx:6394
int count_read
Definition: midas.cxx:6399
double bytes_sent
Definition: midas.cxx:6392
DWORD client_time_write_wait[MAX_CLIENTS]
Definition: midas.cxx:6402
int max_requested_space
Definition: midas.cxx:6398
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_statistics_to_odb()

INT bm_write_statistics_to_odb ( void  )

Close all open buffers

Returns
BM_SUCCESS

Definition at line 7244 of file midas.cxx.

7244  {
7245 #ifdef LOCAL_ROUTINES
7246  {
7247  int status;
7248  HNDLE hDB;
7249 
7251 
7252  if (status != CM_SUCCESS) {
7253  //printf("bm_write_statistics_to_odb: cannot get ODB handle!\n");
7254  return BM_SUCCESS;
7255  }
7256 
7257  std::vector<BUFFER*> mybuffers;
7258 
7259  gBuffersMutex.lock();
7260  mybuffers = gBuffers;
7261  gBuffersMutex.unlock();
7262 
7263  for (BUFFER* pbuf : mybuffers) {
7264  if (!pbuf || !pbuf->attached)
7265  continue;
7267  }
7268  }
7269 #endif /* LOCAL_ROUTINES */
7270 
7271  return BM_SUCCESS;
7272 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ bm_write_to_buffer_locked()

static void bm_write_to_buffer_locked ( BUFFER_HEADER pheader,
int  sg_n,
const char *const  sg_ptr[],
const size_t  sg_len[],
size_t  total_size 
)
static

Definition at line 9471 of file midas.cxx.

9472 {
9473  char *pdata = (char *) (pheader + 1);
9474 
9475  //int old_write_pointer = pheader->write_pointer;
9476 
9477  /* new event fits into the remaining space? */
9478  if ((size_t)pheader->write_pointer + total_size <= (size_t)pheader->size) {
9479  //memcpy(pdata + pheader->write_pointer, pevent, event_size);
9480  char* wptr = pdata + pheader->write_pointer;
9481  for (int i=0; i<sg_n; i++) {
9482  //printf("memcpy %p+%d\n", sg_ptr[i], (int)sg_len[i]);
9483  memcpy(wptr, sg_ptr[i], sg_len[i]);
9484  wptr += sg_len[i];
9485  }
9486  pheader->write_pointer = pheader->write_pointer + total_size;
9487  assert(pheader->write_pointer <= pheader->size);
9488  /* remaining space is smaller than size of an event header? */
9489  if ((pheader->write_pointer + (int) sizeof(EVENT_HEADER)) > pheader->size) {
9490  // note: ">" here to match "bm_incr_rp". If remaining space is exactly
9491  // equal to the event header size, we will write the next event header here,
9492  // then wrap the pointer and write the event data at the beginning of the buffer.
9493  //printf("bm_write_to_buffer_locked: truncate wp %d. buffer size %d, remaining %d, event header size %d, event size %d, total size %d\n", pheader->write_pointer, pheader->size, pheader->size-pheader->write_pointer, (int)sizeof(EVENT_HEADER), event_size, total_size);
9494  pheader->write_pointer = 0;
9495  }
9496  } else {
9497  /* split event */
9498  size_t size = pheader->size - pheader->write_pointer;
9499 
9500  //printf("split: wp %d, size %d, avail %d\n", pheader->write_pointer, pheader->size, size);
9501 
9502  //memcpy(pdata + pheader->write_pointer, pevent, size);
9503  //memcpy(pdata, ((const char *) pevent) + size, event_size - size);
9504 
9505  char* wptr = pdata + pheader->write_pointer;
9506  size_t count = 0;
9507 
9508  // copy first part
9509 
9510  int i = 0;
9511  for (; i<sg_n; i++) {
9512  if (count + sg_len[i] > size)
9513  break;
9514  memcpy(wptr, sg_ptr[i], sg_len[i]);
9515  wptr += sg_len[i];
9516  count += sg_len[i];
9517  }
9518 
9519  //printf("wptr %d, count %d\n", wptr-pdata, count);
9520 
9521  // split segment
9522 
9523  size_t first = size - count;
9524  size_t second = sg_len[i] - first;
9525  assert(first + second == sg_len[i]);
9526  assert(count + first == size);
9527 
9528  //printf("first %d, second %d\n", first, second);
9529 
9530  memcpy(wptr, sg_ptr[i], first);
9531  wptr = pdata + 0;
9532  count += first;
9533  memcpy(wptr, sg_ptr[i] + first, second);
9534  wptr += second;
9535  count += second;
9536  i++;
9537 
9538  // copy remaining
9539 
9540  for (; i<sg_n; i++) {
9541  memcpy(wptr, sg_ptr[i], sg_len[i]);
9542  wptr += sg_len[i];
9543  count += sg_len[i];
9544  }
9545 
9546  //printf("wptr %d, count %d\n", wptr-pdata, count);
9547 
9548  //printf("bm_write_to_buffer_locked: wrap wp %d -> %d. buffer size %d, available %d, wrote %d, remaining %d, event size %d, total size %d\n", pheader->write_pointer, total_size-size, pheader->size, pheader->size-pheader->write_pointer, size, pheader->size - (pheader->write_pointer+size), event_size, total_size);
9549 
9550  pheader->write_pointer = total_size - size;
9551  }
9552 
9553  //printf("bm_write_to_buffer_locked: buf [%s] size %d, wrote %d/%d, wp %d -> %d\n", pheader->name, pheader->size, event_size, total_size, old_write_pointer, pheader->write_pointer);
9554 }
Here is the caller graph for this function:

Variable Documentation

◆ _bm_lock_timeout

int _bm_lock_timeout = 5 * 60 * 1000
static

Definition at line 5915 of file midas.cxx.

◆ _bm_max_event_size

DWORD _bm_max_event_size = 0
static

Definition at line 5910 of file midas.cxx.

◆ _bm_mutex_timeout_sec

double _bm_mutex_timeout_sec = _bm_lock_timeout/1000 + 15.000
static

Definition at line 5916 of file midas.cxx.

◆ defrag_buffer

EVENT_DEFRAG_BUFFER defrag_buffer[MAX_DEFRAG_EVENTS]
static

Definition at line 11248 of file midas.cxx.