36 #include <sys/types.h>
37 #include <sys/socket.h>
39 #include <sys/ioctl.h>
40 #include <netinet/in.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
57 #include <qb/qbipc_common.h>
65 #define MESSAGE_REQ_SYNC_BARRIER 0
66 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
67 #define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2
84 const unsigned int *trans_list,
85 size_t trans_list_entries,
86 const unsigned int *member_list,
87 size_t member_list_entries,
102 struct qb_ipc_request_header header __attribute__((aligned(8)));
103 struct memb_ring_id ring_id __attribute__((aligned(8)));
107 struct qb_ipc_request_header header __attribute__((aligned(8)));
108 struct memb_ring_id ring_id __attribute__((aligned(8)));
114 struct qb_ipc_request_header header __attribute__((aligned(8)));
115 struct memb_ring_id ring_id __attribute__((aligned(8)));
124 static int my_memb_determine = 0;
128 static unsigned int my_memb_determine_list_entries = 0;
130 static int my_processing_idx = 0;
140 static size_t my_member_list_entries = 0;
142 static size_t my_trans_list_entries = 0;
144 static int my_processor_list_entries = 0;
148 static int my_service_list_entries = 0;
150 static void (*sync_synchronization_completed) (void);
152 static void sync_deliver_fn (
155 unsigned int msg_len,
156 int endian_conversion_required);
158 static int schedwrk_processor (
const void *context);
160 static void sync_process_enter (
void);
167 static void *sync_group_handle;
174 int (*sync_callbacks_retrieve) (
177 void (*synchronization_completed) (
void))
187 "Couldn't initialize groups interface.");
200 sync_synchronization_completed = synchronization_completed;
206 static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
210 int barrier_reached = 1;
212 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
218 for (i = 0; i < my_processor_list_entries; i++) {
219 if (my_processor_list[i].nodeid == nodeid) {
223 for (i = 0; i < my_processor_list_entries; i++) {
224 if (my_processor_list[i].received == 0) {
228 if (barrier_reached) {
230 my_service_list[my_processing_idx].name);
237 my_processing_idx += 1;
238 if (my_service_list_entries == my_processing_idx) {
239 my_memb_determine_list_entries = 0;
240 sync_synchronization_completed ();
242 sync_process_enter ();
247 static void dummy_sync_init (
248 const unsigned int *trans_list,
249 size_t trans_list_entries,
250 const unsigned int *member_list,
251 size_t member_list_entries,
256 static void dummy_sync_abort (
void)
260 static int dummy_sync_process (
void)
265 static void dummy_sync_activate (
void)
269 static int service_entry_compare (
const void *a,
const void *b)
277 static void sync_memb_determine (
unsigned int nodeid,
const void *msg)
283 if (memcmp (&req_exec_memb_determine_message->ring_id,
284 &my_memb_determine_ring_id, sizeof (
struct memb_ring_id)) != 0) {
290 my_memb_determine = 1;
291 for (i = 0; i < my_memb_determine_list_entries; i++) {
292 if (my_memb_determine_list[i] == nodeid) {
297 my_memb_determine_list[my_memb_determine_list_entries] =
nodeid;
298 my_memb_determine_list_entries += 1;
302 static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
306 int barrier_reached = 1;
308 int qsort_trigger = 0;
310 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
315 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
318 for (j = 0; j < my_service_list_entries; j++) {
319 if (req_exec_service_build_message->service_list[i] ==
320 my_service_list[j].service_id) {
326 my_service_list[my_service_list_entries].
state =
328 my_service_list[my_service_list_entries].
service_id =
329 req_exec_service_build_message->service_list[i];
330 sprintf (my_service_list[my_service_list_entries].name,
331 "Unknown External Service (id = %d)\n",
332 req_exec_service_build_message->service_list[i]);
333 my_service_list[my_service_list_entries].
sync_init =
335 my_service_list[my_service_list_entries].
sync_abort =
341 my_service_list_entries += 1;
347 qsort (my_service_list, my_service_list_entries,
350 for (i = 0; i < my_processor_list_entries; i++) {
351 if (my_processor_list[i].nodeid == nodeid) {
355 for (i = 0; i < my_processor_list_entries; i++) {
356 if (my_processor_list[i].received == 0) {
360 if (barrier_reached) {
361 sync_process_enter ();
365 static void sync_deliver_fn (
368 unsigned int msg_len,
369 int endian_conversion_required)
371 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
373 switch (header->id) {
375 sync_barrier_handler (nodeid, msg);
378 sync_service_build_handler (nodeid, msg);
381 sync_memb_determine (nodeid, msg);
386 static void memb_determine_message_transmit (
void)
389 struct req_exec_memb_determine_message req_exec_memb_determine_message;
391 req_exec_memb_determine_message.header.size =
sizeof (
struct req_exec_memb_determine_message);
394 memcpy (&req_exec_memb_determine_message.ring_id,
395 &my_memb_determine_ring_id,
398 iovec.iov_base = (
char *)&req_exec_memb_determine_message;
399 iovec.iov_len =
sizeof (req_exec_memb_determine_message);
405 static void barrier_message_transmit (
void)
408 struct req_exec_barrier_message req_exec_barrier_message;
410 req_exec_barrier_message.header.size =
sizeof (
struct req_exec_barrier_message);
413 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
416 iovec.iov_base = (
char *)&req_exec_barrier_message;
417 iovec.iov_len =
sizeof (req_exec_barrier_message);
423 static void service_build_message_transmit (
struct req_exec_service_build_message *service_build_message)
427 service_build_message->header.size =
sizeof (
struct req_exec_service_build_message);
430 memcpy (&service_build_message->ring_id, &my_ring_id,
433 iovec.iov_base = (
void *)service_build_message;
434 iovec.iov_len =
sizeof (
struct req_exec_service_build_message);
440 static void sync_barrier_enter (
void)
443 barrier_message_transmit ();
446 static void sync_process_enter (
void)
455 if (my_service_list_entries == 0) {
457 my_memb_determine_list_entries = 0;
458 sync_synchronization_completed ();
461 for (i = 0; i < my_processor_list_entries; i++) {
469 static void sync_servicelist_build_enter (
470 const unsigned int *member_list,
471 size_t member_list_entries,
474 struct req_exec_service_build_message service_build;
480 for (i = 0; i < member_list_entries; i++) {
481 my_processor_list[i].
nodeid = member_list[i];
484 my_processor_list_entries = member_list_entries;
486 memcpy (my_member_list, member_list,
487 member_list_entries *
sizeof (
unsigned int));
488 my_member_list_entries = member_list_entries;
490 my_processing_idx = 0;
493 my_service_list_entries = 0;
503 my_service_list[my_service_list_entries].
state =
INIT;
504 my_service_list[my_service_list_entries].
service_id = i;
505 strcpy (my_service_list[my_service_list_entries].
name,
511 my_service_list_entries += 1;
514 for (i = 0; i < my_service_list_entries; i++) {
515 service_build.service_list[i] =
518 service_build.service_list_entries = my_service_list_entries;
520 service_build_message_transmit (&service_build);
523 static int schedwrk_processor (
const void *context)
527 if (my_service_list[my_processing_idx].state ==
INIT) {
529 size_t old_trans_list_entries = 0;
533 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
534 sizeof (
unsigned int));
535 old_trans_list_entries = my_trans_list_entries;
537 my_trans_list_entries = 0;
538 for (o = 0; o < old_trans_list_entries; o++) {
539 for (m = 0; m < my_member_list_entries; m++) {
540 if (old_trans_list[o] == my_member_list[m]) {
541 my_trans_list[my_trans_list_entries] = my_member_list[m];
542 my_trans_list_entries++;
549 my_service_list[my_processing_idx].
sync_init (my_trans_list,
550 my_trans_list_entries, my_member_list,
551 my_member_list_entries,
555 if (my_service_list[my_processing_idx].state ==
PROCESS) {
558 res = my_service_list[my_processing_idx].
sync_process ();
563 sync_barrier_enter();
572 const unsigned int *member_list,
573 size_t member_list_entries,
577 memcpy (&my_ring_id, ring_id,
sizeof (
struct memb_ring_id));
579 if (my_memb_determine) {
580 my_memb_determine = 0;
581 sync_servicelist_build_enter (my_memb_determine_list,
582 my_memb_determine_list_entries, ring_id);
584 sync_servicelist_build_enter (member_list, member_list_entries,
590 const unsigned int *member_list,
591 size_t member_list_entries,
595 memcpy (my_trans_list, member_list, member_list_entries *
596 sizeof (
unsigned int));
597 my_trans_list_entries = member_list_entries;
606 my_service_list[my_processing_idx].
sync_abort ();
619 memcpy (&my_memb_determine_ring_id, ring_id,
622 memb_determine_message_transmit ();
628 my_memb_determine_list_entries = 0;
629 memset (&my_memb_determine_ring_id, 0,
sizeof (
struct memb_ring_id));
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Totem Single Ring Protocol.
int sync_init(int(*sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks), void(*synchronization_completed)(void))
#define MESSAGE_REQ_SYNC_MEMB_DETERMINE
void(* sync_activate)(void)
struct message_header header
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
#define log_printf(level, format, args...)
void(* sync_activate)(void)
void schedwrk_destroy(hdb_handle_t handle)
#define LOGSYS_LEVEL_ERROR
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
#define LOGSYS_LEVEL_DEBUG
enum sync_process_state state
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
int(* sync_process)(void)
int(* sync_process)(void)
#define PROCESSOR_COUNT_MAX
void sync_memb_list_abort(void)
#define MESSAGE_REQ_SYNC_BARRIER
#define SERVICES_COUNT_MAX
struct qb_ipc_request_header header __attribute__((aligned(8)))
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
LOGSYS_DECLARE_SUBSYS("SYNC")
struct memb_ring_id ring_id
void sync_memb_list_determine(const struct memb_ring_id *ring_id)
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)