From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (qmail 5276 invoked by alias); 14 Aug 2008 14:46:36 -0000 Received: (qmail 5267 invoked by alias); 14 Aug 2008 14:46:36 -0000 X-Spam-Status: Yes, hits=5.8 required=5.0 tests=AWL,BAYES_50,J_CHICKENPOX_21,J_CHICKENPOX_23,J_CHICKENPOX_24,J_CHICKENPOX_25,J_CHICKENPOX_27,J_CHICKENPOX_28,J_CHICKENPOX_42,J_CHICKENPOX_43,J_CHICKENPOX_44,J_CHICKENPOX_46,J_CHICKENPOX_48,J_CHICKENPOX_53,J_CHICKENPOX_56,J_CHICKENPOX_62,J_CHICKENPOX_63,J_CHICKENPOX_64,J_CHICKENPOX_65,J_CHICKENPOX_66,J_CHICKENPOX_72,J_CHICKENPOX_75,J_CHICKENPOX_83,KAM_MX,SPF_HELO_PASS X-Spam-Flag: YES X-Spam-Check-By: sourceware.org X-Spam-Checker-Version: SpamAssassin 3.2.4 (2008-01-01) on bastion.fedora.phx.redhat.com X-Spam-Level: Subject: master - ccs: move ccs/daemon to config/daemons/ccds and mark it legacy code To: cluster-cvs-relay@redhat.com X-Project: Cluster Project X-Git-Module: cluster.git X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 04df458184148162c46545d7d04d415c1ab8a361 X-Git-Newrev: 64b733dddde329ce48b08dc45d68e4dcc60af575 From: "Fabio M. Di Nitto" Message-Id: <20080814142703.39C1BA8263@lists.fedorahosted.org> Date: Thu, 14 Aug 2008 15:42:00 -0000 X-Scanned-By: MIMEDefang 2.58 on 172.16.52.254 Mailing-List: contact cluster-cvs-help@sourceware.org; run by ezmlm Precedence: bulk List-Id: List-Subscribe: List-Post: List-Help: , Sender: cluster-cvs-owner@sourceware.org X-SW-Source: 2008-q3/txt/msg00257.txt.bz2 Gitweb: http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=64b733dddde329ce48b08dc45d68e4dcc60af575 Commit: 64b733dddde329ce48b08dc45d68e4dcc60af575 Parent: 04df458184148162c46545d7d04d415c1ab8a361 Author: Fabio M. Di Nitto AuthorDate: Thu Aug 7 08:44:18 2008 +0200 Committer: Fabio M. Di Nitto CommitterDate: Thu Aug 14 15:18:03 2008 +0200 ccs: move ccs/daemon to config/daemons/ccds and mark it legacy code remove support for --without_ccs. ccs is now fully obsoleted and you need to specify --enable_legacy_code to build it. adapt top level Makefile to deal with cluster/ccs removal. fix build dependencies around cman/lib that's now required before config: target. move ccsd man page in config/daemons/man. Signed-off-by: Fabio M. Di Nitto --- Makefile | 7 +- ccs/Makefile | 4 - ccs/daemon/Makefile | 34 - ccs/daemon/ccsd.c | 922 ------------------------ ccs/daemon/cluster_mgr.c | 688 ------------------ ccs/daemon/cluster_mgr.h | 6 - ccs/daemon/cnx_mgr.c | 1393 ------------------------------------ ccs/daemon/cnx_mgr.h | 7 - ccs/daemon/comm_headers.h | 48 -- ccs/daemon/debug.h | 9 - ccs/daemon/globals.c | 19 - ccs/daemon/globals.h | 23 - ccs/daemon/misc.c | 288 -------- ccs/daemon/misc.h | 19 - ccs/man/Makefile | 6 - ccs/man/ccsd.8 | 74 -- config/Makefile | 2 +- config/daemons/Makefile | 8 + config/daemons/ccsd/Makefile | 37 + config/daemons/ccsd/ccsd.c | 922 ++++++++++++++++++++++++ config/daemons/ccsd/cluster_mgr.c | 688 ++++++++++++++++++ config/daemons/ccsd/cluster_mgr.h | 6 + config/daemons/ccsd/cnx_mgr.c | 1393 ++++++++++++++++++++++++++++++++++++ config/daemons/ccsd/cnx_mgr.h | 7 + config/daemons/ccsd/comm_headers.h | 48 ++ config/daemons/ccsd/debug.h | 9 + config/daemons/ccsd/globals.c | 19 + config/daemons/ccsd/globals.h | 23 + config/daemons/ccsd/misc.c | 288 ++++++++ config/daemons/ccsd/misc.h | 19 + config/daemons/man/Makefile | 9 + config/daemons/man/ccsd.8 | 74 ++ config/libs/libccscompat/Makefile | 2 +- config/tools/ccs_tool/Makefile | 9 +- make/defines.mk.input | 1 - 35 files changed, 3558 insertions(+), 3553 deletions(-) diff --git a/Makefile b/Makefile index 260235a..f11b298 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,14 @@ include make/defines.mk REALSUBDIRS = gnbd-kernel/src gfs-kernel/src/gfs \ - config cman ccs dlm fence/libfenced group \ + cman/lib config cman dlm fence/libfenced group \ fence gfs gfs2 gnbd rgmanager bindings doc SUBDIRS = $(filter-out \ $(if ${without_gnbd-kernel/src},gnbd-kernel/src) \ $(if ${without_gfs-kernel/src/gfs},gfs-kernel/src/gfs) \ + $(if ${without_cman},cman/lib) \ $(if ${without_cman},cman) \ - $(if ${without_ccs},ccs) \ $(if ${without_dlm},dlm) \ $(if ${without_fence},fence/libfenced) \ $(if ${without_group},group) \ @@ -32,9 +32,8 @@ gfs-kernel: gfs-kernel/src/gfs # Dependencies -config: +config: cman/lib cman: config -ccs: cman dlm: config fence/libfenced: group: cman dlm fence/libfenced diff --git a/ccs/Makefile b/ccs/Makefile deleted file mode 100644 index 7eab21a..0000000 --- a/ccs/Makefile +++ /dev/null @@ -1,4 +0,0 @@ -include ../make/defines.mk -include $(OBJDIR)/make/passthrough.mk - -SUBDIRS=daemon man diff --git a/ccs/daemon/Makefile b/ccs/daemon/Makefile deleted file mode 100644 index ade527e..0000000 --- a/ccs/daemon/Makefile +++ /dev/null @@ -1,34 +0,0 @@ -TARGET= ccsd - -SBINDIRT=$(TARGET) - -all: ${TARGET} - -include ../../make/defines.mk -include $(OBJDIR)/make/cobj.mk -include $(OBJDIR)/make/clean.mk -include $(OBJDIR)/make/install.mk -include $(OBJDIR)/make/uninstall.mk - -OBJS= ccsd.o \ - cnx_mgr.o \ - cluster_mgr.o \ - misc.o \ - globals.o - -CFLAGS += -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -CFLAGS += -I${cmanincdir} `xml2-config --cflags` -I${corosyncincdir} -CFLAGS += -I$(S) -I$(S)/../include -I$(S)/../common -CFLAGS += -I${incdir} - -LDFLAGS += -L${cmanlibdir} -lcman -LDFLAGS += -L${corosynclibdir} -llogsys -LDFLAGS += `xml2-config --libs` -lpthread -LDFLAGS += -L${libdir} - -${TARGET}: ${OBJS} - $(CC) -o $@ $^ $(LDFLAGS) - -clean: generalclean - --include $(OBJS:.o=.d) diff --git a/ccs/daemon/ccsd.c b/ccs/daemon/ccsd.c deleted file mode 100644 index d3b63d8..0000000 --- a/ccs/daemon/ccsd.c +++ /dev/null @@ -1,922 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "debug.h" -#include "cnx_mgr.h" -#include "cluster_mgr.h" -#include "globals.h" -#include "comm_headers.h" -#include "misc.h" - -#include "copyright.cf" - -int debug = 0; -extern volatile int quorate; -int no_manager_opt=0; -static int exit_now=0; -static unsigned int flags=0; -static sigset_t signal_mask; -static int signal_received = 0; -#define FLAG_NODAEMON 1 - -static char *parse_cli_args(int argc, char *argv[]); -static int check_cluster_conf(void); -static void daemonize(void); -static void print_start_msg(char *msg); -static int join_group(int sfd, int loopback, int port); -static int setup_local_socket(int backlog); -static inline void process_signals(void); - -int main(int argc, char *argv[]){ - int i,error=0; - int trueint = 1; - int sfds[3] = {-1,-1,-1}, afd; - struct sockaddr_storage addr; - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; - int addr_size=0; - fd_set rset, tmp_set; - char *msg; - unsigned int logmode; - - logsys_init("CCS", LOG_MODE_OUTPUT_STDERR | LOG_MODE_OUTPUT_SYSLOG_THREADED | LOG_MODE_OUTPUT_FILE | LOG_MODE_FILTER_DEBUG_FROM_SYSLOG | LOG_MODE_BUFFER_BEFORE_CONFIG, SYSLOGFACILITY, SYSLOGLEVEL, LOGDIR "/ccs.log"); - - msg = parse_cli_args(argc, argv); - - if(getenv("CCS_DEBUGLOG")) - debug = 1; - - /* enable debug as early as possible */ - if(debug) - logsys_config_priority_set (LOG_LEVEL_DEBUG); - - if(check_cluster_conf()){ - /* check_cluster_conf will print out errors if there are any */ - exit(EXIT_FAILURE); - } - - logmode = logsys_config_mode_get(); - - if(logmode & LOG_MODE_BUFFER_BEFORE_CONFIG) { - log_printf(LOG_DEBUG, "Using default CCS logsys config options\n"); - logmode &= ~LOG_MODE_BUFFER_BEFORE_CONFIG; - logmode |= LOG_MODE_FLUSH_AFTER_CONFIG; - logsys_config_mode_set (logmode); - } - - daemonize(); - - print_start_msg(msg); - - if(msg){ - free(msg); - } - - if (!no_manager_opt){ - if(start_cluster_monitor_thread()){ - log_printf(LOG_ERR, "Unable to create thread.\n"); - exit(EXIT_FAILURE); - } - } - - memset(&addr, 0, sizeof(struct sockaddr_storage)); - - /** Setup the socket to communicate with the CCS library **/ - if(IPv6 && (sfds[0] = socket(PF_INET6, SOCK_STREAM, 0)) < 0){ - if(IPv6 == -1){ - log_printf(LOG_DEBUG, "Unable to create IPv6 socket:: %s\n", strerror(errno)); - IPv6=0; - } else { - log_printf(LOG_ERR, "Unable to create IPv6 socket"); - exit(EXIT_FAILURE); - } - } else { - /* IPv6 is no longer optional for ccsd - IPv6 = (IPv6)? 1: 0; - */ - } - - log_printf(LOG_DEBUG, "Using %s\n", IPv6?"IPv6":"IPv4"); - - if(!IPv6 && (sfds[0] = socket(PF_INET, SOCK_STREAM, 0)) < 0){ - log_printf(LOG_ERR, "Unable to create IPv4 socket"); - exit(EXIT_FAILURE); - } - - if(setsockopt(sfds[0], SOL_SOCKET, SO_REUSEADDR, &trueint, sizeof(int))){ - log_printf(LOG_ERR, "Unable to set socket option"); - exit(EXIT_FAILURE); - } - - if(IPv6){ - addr_size = sizeof(struct sockaddr_in6); - addr6->sin6_family = AF_INET6; - addr6->sin6_addr = in6addr_loopback; - addr6->sin6_port = htons(frontend_port); - } else { - addr_size = sizeof(struct sockaddr_in); - addr4->sin_family = AF_INET; - /* addr4->sin_addr.s_addr = INADDR_LOOPBACK; */ - inet_aton("127.0.0.1", (struct in_addr *)&(addr4->sin_addr.s_addr)); - addr4->sin_port = htons(frontend_port); - } - - if(bind(sfds[0], (struct sockaddr *)&addr, addr_size) < 0){ - log_printf(LOG_ERR, "Unable to bind socket"); - close(sfds[0]); - exit(EXIT_FAILURE); - } - - listen(sfds[0], 5); - - - /** Setup the socket to communicate with the CCS library **/ - sfds[1] = socket((IPv6)? PF_INET6: PF_INET, SOCK_DGRAM, 0); - if(sfds[1] < 0){ - log_printf(LOG_ERR, "Socket creation failed"); - exit(EXIT_FAILURE); - } else { - int trueint = 1; - if(setsockopt(sfds[1], SOL_SOCKET, SO_REUSEADDR, &trueint, sizeof(int))){ - log_printf(LOG_ERR, "Unable to set socket option"); - exit(EXIT_FAILURE); - } - } - - if(IPv6){ - addr6->sin6_family = AF_INET6; - addr6->sin6_addr = in6addr_any; - addr6->sin6_port = htons(backend_port); - } else { - addr4->sin_family = AF_INET; - addr4->sin_addr.s_addr = INADDR_ANY; - addr4->sin_port = htons(backend_port); - } - - if(bind(sfds[1], (struct sockaddr *)&addr, addr_size) < 0){ - log_printf(LOG_ERR, "Unable to bind socket"); - close(sfds[1]); - return -errno; - } - - if(IPv6 || multicast_address){ - if(join_group(sfds[1], 1, backend_port)){ - log_printf(LOG_ERR, "Unable to join multicast group.\n"); - exit(EXIT_FAILURE); - } - } - - /* Set up the unix (local) socket for CCS lib comms */ - sfds[2] = setup_local_socket(SOMAXCONN); - - FD_ZERO(&rset); - FD_SET(sfds[0], &rset); - FD_SET(sfds[1], &rset); - if (sfds[2] >= 0) - FD_SET(sfds[2], &rset); - - log_printf(LOG_DEBUG, "Sending SIGTERM to parent\n"); - kill(getppid(), SIGTERM); - - while(1){ - unsigned int len = addr_size; - - process_signals(); - - tmp_set = rset; - - if((select(FD_SETSIZE, &tmp_set, NULL,NULL,NULL) < 0)){ - if(errno != EINTR){ - log_printf(LOG_ERR, "Select failed"); - } - continue; - } - - for(i=0; i<3; i++){ - if(sfds[i] < 0 || !FD_ISSET(sfds[i], &tmp_set)){ - continue; - } - if(i == 0){ - uint16_t port; - log_printf(LOG_DEBUG, "NORMAL CCS REQUEST.\n"); - afd = accept(sfds[i], (struct sockaddr *)&addr, &len); - if(afd < 0){ - log_printf(LOG_ERR, "Unable to accept connection"); - continue; - } - - port = (IPv6) ? addr6->sin6_port : addr4->sin_port; - - log_printf(LOG_DEBUG, "Connection requested from port %u.\n", ntohs(port)); - - if(ntohs(port) > 1024){ - log_printf(LOG_ERR, "Refusing connection from port > 1024: port = %d", ntohs(port)); - close(afd); - continue; - } - if((error = process_request(afd))){ - log_printf(LOG_ERR, "Error while processing request: %s\n", strerror(-error)); - } - close(afd); - } else if (i == 2) { - log_printf(LOG_DEBUG, "NORMAL CCS REQUEST.\n"); - afd = accept(sfds[i], NULL, NULL); - if(afd < 0){ - log_printf(LOG_ERR, "Unable to accept connection"); - continue; - } - - log_printf(LOG_DEBUG, "Connection requested from local socket\n"); - - if((error = process_request(afd))){ - log_printf(LOG_ERR, "Error while processing request: %s\n", strerror(-error)); - } - close(afd); - } else { - log_printf(LOG_DEBUG, "BROADCAST REQUEST.\n"); - if((error = process_broadcast(sfds[i]))){ - log_printf(LOG_ERR, "Error while processing broadcast: %s\n", strerror(-error)); - } - } - } - } - logsys_exit(); - exit(EXIT_SUCCESS); -} - - -/** - * print_usage - print usage information - * @stream: open file stream to print to - * - */ -static void print_usage(FILE *stream){ - CCSENTER("print_usage"); - fprintf(stream, - "Usage:\n" - "\n" - "ccsd [Options]\n" - "\n" - "Options:\n" - " -4 Use IPv4 only.\n" - " -6 Use IPv6 only.\n" - " -I Use IP for everything (disables local sockets)\n" - " -h Help.\n" - " -m Specify multicast address (\"default\" ok).\n" - " -n No Daemon. Run in the foreground.\n" - " -d Enable debugging output.\n" - " -t Multicast threshold (aka Time to Live) value.\n" - " -P [bcf]:# Specify various port numbers.\n" - " -V Print version information.\n" - " -X No cluster manager, just read local " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n" - ); - CCSEXIT("print_usage"); -} - - -static int is_multicast_addr(char *addr_string){ - int rtn = 0; - struct sockaddr_storage addr; - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; - - CCSENTER("is_multicast_addr"); - - if(inet_pton(AF_INET6, addr_string, &(addr6->sin6_addr)) > 0){ - if(IN6_IS_ADDR_MULTICAST(&addr6->sin6_addr)){ - rtn = AF_INET6; - } - } else if(inet_pton(AF_INET, addr_string, &(addr4->sin_addr)) > 0){ - if(IN_MULTICAST(ntohl(addr4->sin_addr.s_addr))){ - rtn = AF_INET; - } - } - CCSEXIT("is_multicast_addr"); - return rtn; -} - - -/** - * parse_cli_args - * @argc: - * @argv: - * - * This function parses the command line arguments and sets the - * appropriate flags in the global 'flags' variable. Additionally, - * it sets the global 'config_file_location'. This function - * will either succeed or cause the program to exit. - * - * Returns: string (or NULL) describing changes, exit(EXIT_FAILURE) on error - */ -static char *parse_cli_args(int argc, char *argv[]){ - int c, error=0; - int buff_size=512; - char buff[buff_size]; - int buff_index=0; - - CCSENTER("parse_cli_args"); - - config_file_location = strdup(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); - lockfile_location = strdup(DEFAULT_CCSD_LOCKFILE); - - if(!config_file_location || !lockfile_location){ - fprintf(stderr, "Insufficient memory.\n"); - error = -ENOMEM; - goto fail; - } - - memset(buff, 0, buff_size); - - while((c = getopt(argc, argv, "46Icdf:hlm:nP:t:sVX")) != -1){ - switch(c){ - case '4': - if(IPv6 == 1){ - fprintf(stderr, - "Setting protocol to IPv4 conflicts with multicast address.\n"); - error = -EINVAL; - goto fail; - } - IPv6=0; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " IP Protocol:: IPv4 only\n"); - break; - case '6': - if(IPv6 == 0){ - fprintf(stderr, - "Setting protocol to IPv6 conflicts with previous protocol choice.\n"); - error = -EINVAL; - goto fail; - } - IPv6=1; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " IP Protocol:: IPv6 only\n"); - break; - case 'I': - if (use_local) { - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Communication:: Local sockets disabled\n"); - } - use_local = 0; - break; - case 'c': - fprintf(stderr, "The '-c' option is deprecated.\n" - "Try '-h' for help.\n"); - error = -EINVAL; - goto fail; - case 'd': - debug = 1; - break; - case 'f': /* might be usable for upgrade */ - free(config_file_location); - config_file_location = optarg; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Config file location:: %s\n", optarg); - break; - case 'h': - print_usage(stdout); - exit(EXIT_SUCCESS); - case 'l': - fprintf(stderr, "The '-l' option is deprecated.\n" - "Try '-h' for help.\n"); - error = -EINVAL; - goto fail; - case 'm': - if(strcmp("default", optarg)){ - int type = is_multicast_addr(optarg); - if((IPv6 == 1) && (type != AF_INET6)){ - fprintf(stderr, "%s is not a valid IPv6 multicast address.\n", optarg); - error = -EINVAL; - goto fail; - } else if((IPv6 == 0) && (type != AF_INET)){ - fprintf(stderr, "%s is not a valid IPv4 multicast address.\n", optarg); - error = -EINVAL; - goto fail; - } else if(type == 0){ - fprintf(stderr, "%s is not a valid multicast address.\n", optarg); - error = -EINVAL; - goto fail; - } else { - IPv6 = (type == AF_INET6)? 1: 0; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " IP Protocol:: %s only*\n", - (IPv6)? "IPv6" : "IPv4"); - } - } - multicast_address = optarg; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Multicast (%s):: SET\n", optarg); - break; - case 'n': - flags |= FLAG_NODAEMON; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " No Daemon:: SET\n"); - break; - case 'p': - free(lockfile_location); - lockfile_location = optarg; - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Lock file location:: %s\n", optarg); - break; - case 'P': - if(optarg[1] != ':'){ - fprintf(stderr, "Bad argument to '-P' option.\n" - "Try '-h' for help.\n"); - error = -EINVAL; - goto fail; - } - switch(optarg[0]){ - case 'b': /* backend port number */ - backend_port = atoi(optarg+2); - if(backend_port < 1024){ - fprintf(stderr, "Bad backend port number.\n"); - error = -EINVAL; - goto fail; - } - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Backend Port:: %d\n", backend_port); - break; - case 'c': /* cluster base port number */ - cluster_base_port = atoi(optarg+2); - if(cluster_base_port < 1024){ - fprintf(stderr, "Bad cluster base port number.\n"); - error = -EINVAL; - goto fail; - } - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Cluster base port:: %d\n", cluster_base_port); - break; - case 'f': /* frontend port number */ - frontend_port = atoi(optarg+2); - if(frontend_port < 1024){ - fprintf(stderr, "Bad frontend port number.\n"); - error = -EINVAL; - goto fail; - } - buff_index += snprintf(buff+buff_index, buff_size-buff_index, - " Frontend Port:: %d\n", frontend_port); - break; - default: - fprintf(stderr, "Bad argument to '-P' option.\n" - "Try '-h' for help.\n"); - error = -EINVAL; - goto fail; - } - break; - case 's': - fprintf(stderr, "The '-s' option is deprecated.\n" - "Try '-h' for help.\n"); - error = -EINVAL; - goto fail; - case 't': - ttl = atoi(optarg); - break; - case 'V': - printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); - printf("%s\n", REDHAT_COPYRIGHT); - exit(EXIT_SUCCESS); - case 'X': - no_manager_opt = 1; - quorate = 1; - break; - default: - print_usage(stderr); - error = -EINVAL; - goto fail; - } - } - - fail: - CCSEXIT("parse_cli_args"); - - if(error){ - exit(EXIT_FAILURE); - } - if(strlen(buff)){ - return(strdup(buff)); - } else { - return NULL; - } -} - - -/* - * check_cluster_conf - check validity of local copy of cluster.conf - * - * This function tries to parse the xml doc at 'config_file_location'. - * If it fails, it gives instructions to the user. - * - * Returns: 0 on success, -1 on failure - */ -static int check_cluster_conf(void){ - struct stat stat_buf; - xmlDocPtr doc = NULL; - - CCSENTER("check_cluster_conf"); - - if(!stat(config_file_location, &stat_buf)){ - doc = xmlParseFile(config_file_location); - if(!doc){ - log_printf(LOG_ERR, "\nUnable to parse %s.\n" - "You should either:\n" - " 1. Correct the XML mistakes, or\n" - " 2. (Re)move the file and attempt to grab a " - "valid copy from the network.\n", config_file_location); - return -1; - } - set_ccs_logging(doc); - xmlFreeDoc(doc); - } else { - /* no cluster.conf file. This is fine, just need to get it from the network */ - if(no_manager_opt){ - log_printf(LOG_ERR, "\nNo local config file found: %s\n", config_file_location); - return -1; - } - } - - CCSEXIT("check_cluster_conf"); - return 0; -} - - -/** - * create_lockfile - create and lock a lock file - * @lockfile: location of lock file - * - * Returns: 0 on success, -1 otherwise - */ -static int create_lockfile(char *lockfile){ - int fd, error=0; - struct stat stat_buf; - struct flock lock; - char buffer[50]; - - CCSENTER("create_lockfile"); - - if(!strncmp(lockfile, "/var/run/cluster/", 17)){ - if(stat("/var/run/cluster", &stat_buf)){ - if(mkdir("/var/run/cluster", S_IRWXU)){ - log_printf(LOG_ERR, "Cannot create lockfile directory"); - error = -errno; - goto fail; - } - } else if(!S_ISDIR(stat_buf.st_mode)){ - log_printf(LOG_ERR, "/var/run/cluster is not a directory.\n" - "Cannot create lockfile.\n"); - error = -ENOTDIR; - goto fail; - } - } - - if((fd = open(lockfile, O_CREAT | O_WRONLY, - (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH))) < 0){ - log_printf(LOG_ERR, "Cannot create lockfile"); - error = -errno; - goto fail; - } - - lock.l_type = F_WRLCK; - lock.l_start = 0; - lock.l_whence = SEEK_SET; - lock.l_len = 0; - - if (fcntl(fd, F_SETLK, &lock) < 0) { - close(fd); - log_printf(LOG_ERR, "The ccsd process is already running.\n"); - error = -errno; - goto fail; - } - - if (ftruncate(fd, 0) < 0) { - close(fd); - error = -errno; - goto fail; - } - - sprintf(buffer, "%d\n", getpid()); - - if(write(fd, buffer, strlen(buffer)) < strlen(buffer)){ - close(fd); - unlink(lockfile); - error = -errno; - goto fail; - } - - fail: - CCSEXIT("create_lockfile"); - - /* leave fd open - rely on exit to close it */ - if(error){ - return error; - } else { - return 0; - } -} - - -/** - * parent_exit_handler: exit the parent - * @sig: the signal - * - */ -static void parent_exit_handler(int sig){ - CCSENTER("parent_exit_handler"); - exit_now=1; - CCSEXIT("parent_exit_handler"); -} - - -/** - * sig_handler - * @sig - * - * This handles signals which the daemon might receive. - */ -static void sig_handler(int sig){ - sigaddset(&signal_mask, sig); - ++signal_received; -} - -static void process_signal(int sig){ - int err; - - CCSENTER("sig_handler"); - - switch(sig) { - case SIGINT: - log_printf(LOG_INFO, "Stopping ccsd, SIGINT received.\n"); - err = EXIT_SUCCESS; - break; - case SIGQUIT: - log_printf(LOG_INFO, "Stopping ccsd, SIGQUIT received.\n"); - err = EXIT_SUCCESS; - break; - case SIGTERM: - log_printf(LOG_INFO, "Stopping ccsd, SIGTERM received.\n"); - err = EXIT_SUCCESS; - break; - case SIGHUP: - log_printf(LOG_INFO, "SIGHUP received.\n"); - log_printf(LOG_INFO, "Use ccs_tool for updates.\n"); - return; - break; - default: - log_printf(LOG_ERR, "Stopping ccsd, unknown signal %d received.\n", sig); - err = EXIT_FAILURE; - } - - CCSEXIT("sig_handler"); - exit(err); -} - - -static inline void process_signals(void) -{ - int x; - - if (!signal_received) - return; - - signal_received = 0; - - for (x = 1; x < _NSIG; x++) { - if (sigismember(&signal_mask, x)) { - sigdelset(&signal_mask, x); - process_signal(x); - } - } -} - - -/** - * daemonize - * - * This function will do the following: - * - daemonize, if required - * - set up the lockfile - * - set up logging - * - set up signal handlers - * It will cause the program to exit if there is a failure. - */ -static void daemonize(void){ - int error=0; - int pid; - - CCSENTER("daemonize"); - - if(flags & FLAG_NODAEMON){ - log_printf(LOG_DEBUG, "Entering non-daemon mode.\n"); - if((error = create_lockfile(lockfile_location))){ - goto fail; - } - } else { - log_printf(LOG_DEBUG, "Entering daemon mode.\n"); - - signal(SIGTERM, &parent_exit_handler); - - pid = fork(); - - if(pid < 0){ - log_printf(LOG_ERR, "Unable to fork().\n"); - error = pid; - goto fail; - } - - if(pid){ - int status; - while(!waitpid(pid, &status, WNOHANG) && !exit_now); - if(exit_now) { - exit(EXIT_SUCCESS); - } - - switch(WEXITSTATUS(status)){ - case EXIT_CLUSTER_FAIL: - log_printf(LOG_ERR, "Failed to connect to cluster manager.\n"); - break; - case EXIT_LOCKFILE: - log_printf(LOG_ERR, "Failed to create lockfile.\n"); - log_printf(LOG_ERR, "Hint: ccsd is already running.\n"); - break; - } - exit(EXIT_FAILURE); - } - ppid = getppid(); - setsid(); - if(chdir("/") < 0) - goto fail; - umask(0); - - close(0); close(1); close(2); - open("/dev/null", O_RDONLY); /* reopen stdin */ - open("/dev/null", O_WRONLY); /* reopen stdout */ - open("/dev/null", O_WRONLY); /* reopen stderr */ - - if((error = create_lockfile(lockfile_location))){ - exit(EXIT_LOCKFILE); - } - - /* Make the parent stop waiting */ - //log_printf(LOG_DEBUG, "Die early\n"); - //kill(getppid(), SIGTERM); - } - - signal(SIGINT, &sig_handler); - signal(SIGQUIT, &sig_handler); - signal(SIGTERM, &sig_handler); - signal(SIGHUP, &sig_handler); - signal(SIGPIPE, SIG_IGN); - sigemptyset(&signal_mask); - signal_received = 0; - - fail: - CCSEXIT("daemonize"); - - if(error){ - exit(EXIT_FAILURE); - } -} - - -/** - * print_start_msg - * - */ -static void print_start_msg(char *msg){ - CCSENTER("print_start_msg"); - /* We want the start message to print every time */ - log_printf(LOG_INFO, "Starting ccsd %s:\n", RELEASE_VERSION); - log_printf(LOG_INFO, " Built: "__DATE__" "__TIME__"\n"); - log_printf(LOG_INFO, " %s\n", REDHAT_COPYRIGHT); - if(msg){ - log_printf(LOG_INFO, "%s\n", msg); - } - CCSEXIT("print_start_msg"); -} - - -static int join_group(int sfd, int loopback, int port){ - int error = 0; - char *addr_string; - struct sockaddr_storage addr; - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; - - CCSENTER("join_group"); - - if(IPv6){ - if(!multicast_address || !strcmp("default", multicast_address)){ - addr_string = "ff02::3:1"; - } else { - addr_string = multicast_address; - } - inet_pton(AF_INET6, addr_string, &(addr6->sin6_addr)); - addr6->sin6_family = AF_INET6; - addr6->sin6_port = htons(port); - } else { - if(!strcmp("default", multicast_address)){ - addr_string = "224.0.2.5"; - } else { - addr_string = multicast_address; - } - inet_pton(AF_INET, addr_string, &(addr4->sin_addr)); - addr4->sin_family = AF_INET; - addr4->sin_port = htons(port); - } - - if(addr.ss_family == AF_INET){ - struct ip_mreq mreq; - - mreq.imr_multiaddr.s_addr = addr4->sin_addr.s_addr; - mreq.imr_interface.s_addr = INADDR_ANY; - - if(setsockopt(sfd, IPPROTO_IP, IP_MULTICAST_LOOP, - &loopback, sizeof(loopback)) < 0){ - log_printf(LOG_ERR, "Unable to %s loopback.\n", loopback?"SET":"UNSET"); - error = -errno; - goto fail; - } - if(setsockopt(sfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, - (const void *)&mreq, sizeof(mreq)) < 0){ - log_printf(LOG_ERR, "Unable to add to membership.\n"); - error = -errno; - goto fail; - } - } else if(addr.ss_family == AF_INET6){ - struct ipv6_mreq mreq; - - memcpy(&mreq.ipv6mr_multiaddr, &(addr6->sin6_addr), sizeof(struct in6_addr)); - - mreq.ipv6mr_interface = 0; - - if(setsockopt(sfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, - &loopback, sizeof(loopback)) < 0){ - log_printf(LOG_ERR, "Unable to %s loopback.\n", loopback?"SET":"UNSET"); - error = -errno; - goto fail; - } - if(setsockopt(sfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, - (const void *)&mreq, sizeof(mreq)) < 0){ - log_printf(LOG_ERR, "Unable to add to membership: %s\n", strerror(errno)); - error = -errno; - goto fail; - } - } else { - log_printf(LOG_ERR, "Unknown address family.\n"); - error = -EINVAL; - } - fail: - CCSEXIT("join_group"); - return 0; -} - -int setup_local_socket(int backlog) -{ - int sock = -1; - struct sockaddr_un su; - mode_t om; - - CCSENTER("setup_local_socket"); - if (use_local == 0) - goto fail; - - sock = socket(PF_LOCAL, SOCK_STREAM, 0); - if (sock < 0) - goto fail; - - /* This is ours ;) */ - unlink(COMM_LOCAL_SOCKET); - om = umask(077); - su.sun_family = PF_LOCAL; - snprintf(su.sun_path, sizeof(su.sun_path), COMM_LOCAL_SOCKET); - - if (bind(sock, &su, sizeof(su)) < 0) { - umask(om); - goto fail; - } - umask(om); - - if (listen(sock, backlog) < 0) - goto fail; - - log_printf(LOG_DEBUG, "Set up local socket on %s\n", su.sun_path); - CCSEXIT("setup_local_socket"); - return sock; -fail: - if (sock >= 0) - close(sock); - CCSEXIT("setup_local_socket"); - return -1; -} diff --git a/ccs/daemon/cluster_mgr.c b/ccs/daemon/cluster_mgr.c deleted file mode 100644 index 236933f..0000000 --- a/ccs/daemon/cluster_mgr.c +++ /dev/null @@ -1,688 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "comm_headers.h" -#include "debug.h" -#include "misc.h" -#include "globals.h" -#include "libcman.h" - -typedef struct member_list { - int count; - int pad; - cman_node_t *nodes; -} member_list_t; - -static member_list_t *members = NULL; - -static member_list_t *get_member_list(cman_handle_t handle); -static void free_member_list(member_list_t *list); -static char *member_id_to_name(member_list_t *list, int node); -static int member_addr_to_id(member_list_t *list, struct sockaddr *addr); - -static int select_retry(int max_fd, fd_set *rfds, fd_set *wfds, fd_set *xfds, - struct timeval *timeout); - -static ssize_t read_retry(int fd, void *buf, int count, struct timeval *timeout); - -static int check_update_doc(xmlDocPtr tmp_doc) -{ - int error = 0; - - char *str1 = NULL; - char *str2 = NULL; - - CCSENTER("check_update_doc"); - - if (!(str1 = get_cluster_name(tmp_doc))) { - log_printf(LOG_ERR, "Unable to get cluster name from new config file.\n"); - error = -EINVAL; - goto fail; - } - - if (master_doc && master_doc->od_doc && - !(str2 = get_cluster_name(master_doc->od_doc))) { - log_printf(LOG_DEBUG, "Unable to get cluster name from current master doc.\n"); - } - - if (str2 && strcmp(str1, str2)) { - log_printf(LOG_ERR, "Cluster names for current and update configs do not match.\n"); - log_printf(LOG_ERR, " Current cluster name:: <%s>\n", str2); - log_printf(LOG_ERR, " Proposed update name:: <%s>\n", str1); - error = -EINVAL; - goto fail; - } - - if (master_doc && master_doc->od_doc && - (get_doc_version(tmp_doc) <= get_doc_version(master_doc->od_doc))) { - log_printf(LOG_ERR, "Proposed updated config file does not have greater version number.\n"); - log_printf(LOG_ERR, " Current config_version :: %d\n", get_doc_version(master_doc->od_doc)); - log_printf(LOG_ERR, " Proposed config_version:: %d\n", get_doc_version(tmp_doc)); - error = -EINVAL; - } - -fail: - - if (str1) { - free(str1); - } - - if (str2) { - free(str2); - } - - CCSEXIT("check_update_doc"); - return error; -} - -static int handle_cluster_message(int fd) -{ - int error = 0; - int unlock = 0; - int socket = -1; - - FILE *fp = NULL; - char *buffer = NULL; - xmlDocPtr tmp_doc = NULL; - comm_header_t ch; - uint64_t nodeid; - mode_t old_mode; - socklen_t client_len; - - struct timeval tv; - struct sockaddr client_addr; - static uint64_t master_node = 0; - - CCSENTER("handle_cluster_message"); - - log_printf(LOG_DEBUG, "Cluster message on socket: %d\n", fd); - - client_len = sizeof(client_addr); - - if ((socket = accept(fd, &client_addr, &client_len)) < 0) { - log_printf(LOG_ERR, "Failed to accept connection.\n"); - goto fail; - } - - if ((nodeid = member_addr_to_id(members, &client_addr)) < 0) { - log_printf(LOG_ERR, "Unable to determine node ID.\n"); - goto fail; - } - - log_printf(LOG_DEBUG, "Accept socket: %d\n", socket); - - error = recv(socket, &ch, sizeof(comm_header_t), MSG_PEEK); - - if (error < 0) { - log_printf(LOG_ERR, "Failed to receive message from %s\n", - member_id_to_name(members, nodeid)); - goto fail; - } - - log_printf(LOG_DEBUG, "Message (%d bytes) received from %s\n", - error, member_id_to_name(members, nodeid)); - - swab_header(&ch); - - if (ch.comm_type != COMM_UPDATE) { - log_printf(LOG_ERR, "Unexpected communication type (%d)... ignoring.\n", - ch.comm_type); - error = -EINVAL; - goto fail; - } - - if (ch.comm_flags == COMM_UPDATE_NOTICE) { - buffer = malloc(ch.comm_payload_size + sizeof(comm_header_t)); - if (!buffer) { - log_printf(LOG_ERR, "Unable to allocate space to perform update.\n"); - error = -ENOMEM; - goto fail; - } - - log_printf(LOG_DEBUG, "Updated config size:: %d\n", ch.comm_payload_size); - - tv.tv_sec = 5; - tv.tv_usec = 0; - - error = read_retry(socket, buffer, ch.comm_payload_size + sizeof(comm_header_t), &tv); - - if (error < 0) { - log_printf(LOG_ERR, "Unable to retrieve updated config"); - goto fail; - } - - pthread_mutex_lock(&update_lock); - unlock = 1; - - log_printf(LOG_DEBUG, "Got lock 0\n"); - - tmp_doc = xmlParseMemory(buffer+sizeof(comm_header_t), ch.comm_payload_size); - - if (!tmp_doc) { - log_printf(LOG_ERR, "Unable to parse updated config file.\n"); - /* ATTENTION -- need better error code */ - error = -EIO; - goto fail; - } - - if ((error = check_update_doc(tmp_doc)) < 0) { - goto fail; - } - - old_mode = umask(026); - - fp = fopen(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update", "w"); - - umask(old_mode); - - if (!fp) { - log_printf(LOG_ERR, "Unable to open " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update"); - error = -errno; - goto fail; - } - - if (xmlDocDump(fp, tmp_doc) < 0) { - log_printf(LOG_ERR, "Unable to write " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update"); - goto fail; - } - - log_printf(LOG_DEBUG, "Upload of new config file from %s complete.\n", - member_id_to_name(members, nodeid)); - - ch.comm_payload_size = 0; - ch.comm_flags = COMM_UPDATE_NOTICE_ACK; - - log_printf(LOG_DEBUG, "Sending COMM_UPDATE_NOTICE_ACK.\n"); - - swab_header(&ch); - - if ((error = write(socket, &ch, sizeof(comm_header_t))) < 0) { - log_printf(LOG_ERR, "Unable to send COMM_UPDATE_NOTICE_ACK.\n"); - goto fail; - } - - master_node = nodeid; - error = 0; - } - - else if(ch.comm_flags == COMM_UPDATE_COMMIT) { - - tv.tv_sec = 5; - tv.tv_usec = 0; - - error = read_retry(socket, &ch, sizeof(comm_header_t), &tv); - - if (master_node != nodeid) { - log_printf(LOG_ERR, "COMM_UPDATE_COMMIT received from node other than initiator.\n"); - log_printf(LOG_ERR, "Hint: There may be multiple updates happening at once.\n"); - error = -EPERM; - goto fail; - } - - pthread_mutex_lock(&update_lock); - - unlock = 1; - - log_printf(LOG_DEBUG, "Got lock 1\n"); - - tmp_doc = xmlParseFile(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update"); - - if (!tmp_doc) { - log_printf(LOG_ERR, "Unable to parse updated config file.\n"); - /* ATTENTION -- need better error code */ - error = -EIO; - goto fail; - } - - if ((error = check_update_doc(tmp_doc)) < 0) { - goto fail; - } - - old_mode = umask(026); - - fp = fopen(DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE, "w"); - - umask(old_mode); - - if (!fp) { - log_printf(LOG_ERR, "Unable to open " DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE); - error = -errno; - goto fail; - } - - if (xmlDocDump(fp, tmp_doc) < 0) { - log_printf(LOG_ERR, "Unable to write " DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE); - goto fail; - } - - rename(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update", DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); - - update_required = 1; - ch.comm_flags = COMM_UPDATE_COMMIT_ACK; - - log_printf(LOG_DEBUG, "Sending COMM_UPDATE_COMMIT_ACK.\n"); - - swab_header(&ch); - - if ((error = write(socket, &ch, sizeof(comm_header_t))) < 0) { - log_printf(LOG_ERR, "Unable to send COMM_UPDATE_NOTICE_ACK.\n"); - goto fail; - } - - error = 0; - } - -fail: - - if (fp) { - fclose(fp); - } - - if (socket >= 0) { - close(socket); - } - - if (buffer) { - free(buffer); - } - - if (tmp_doc) { - xmlFreeDoc(tmp_doc); - } - - if (unlock) { - pthread_mutex_unlock(&update_lock); - } - - CCSEXIT("handle_cluster_message"); - return error; -} - - -static void cman_callback(cman_handle_t handle, void *private, int reason, int arg) -{ - switch (reason) { - case CMAN_REASON_TRY_SHUTDOWN: - cman_replyto_shutdown(handle, 1); - break; - - case CMAN_REASON_STATECHANGE: - quorate = cman_is_quorate(handle); - free_member_list(members); - members = get_member_list(handle); - break; - - default: - break; - } -} - - -static int handle_cluster_event(cman_handle_t handle) -{ - CCSENTER("handle_cluster_event"); - - int rv = 1; - while (rv > 0) { - rv = cman_dispatch(handle, CMAN_DISPATCH_ALL); - } - if (rv < 0) { - return -1; - } - - CCSEXIT("handle_cluster_event"); - return 0; -} - - -static void cluster_communicator(void) -{ - int ccsd_fd = -1; - int cman_fd = -1; - int warn_user = 0; - int opt = 1; - int max_fd; - int n; - int flags; - - fd_set rset; - cman_handle_t handle = NULL; - - struct sockaddr_storage addr; - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; - int addr_size=0; - - CCSENTER("cluster_communicator"); - - memset(&addr, 0, sizeof(struct sockaddr_storage)); - - if (IPv6) { - if ((ccsd_fd = socket(PF_INET6, SOCK_STREAM, 0)) < 0) { - if(IPv6 == -1) { - log_printf(LOG_DEBUG, "Unable to create IPv6 socket:: %s\n", strerror(errno)); - IPv6=0; - } - } - } - - if (!IPv6) { - if ((ccsd_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - log_printf(LOG_ERR, "Unable to create IPv4 socket.\n"); - exit(EXIT_FAILURE); - } - } - - if (setsockopt(ccsd_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) { - log_printf(LOG_ERR, "Unable to set socket option"); - exit(EXIT_FAILURE); - } - - if(IPv6){ - addr_size = sizeof(struct sockaddr_in6); - addr6->sin6_family = AF_INET6; - addr6->sin6_addr = in6addr_any; - addr6->sin6_port = htons(cluster_base_port); - } else { - addr_size = sizeof(struct sockaddr_in); - addr4->sin_family = AF_INET; - addr4->sin_addr.s_addr = INADDR_ANY; - addr4->sin_port = htons(cluster_base_port); - } - - flags = fcntl(ccsd_fd, F_GETFD, 0); - flags |= FD_CLOEXEC; - fcntl(ccsd_fd, F_SETFD, flags); - - if (bind(ccsd_fd, (struct sockaddr *)&addr, addr_size) < 0) { - log_printf(LOG_ERR, "Unable to bind to socket.\n"); - close(ccsd_fd); - exit(EXIT_FAILURE); - } - - if (listen(ccsd_fd, 15) < 0) { - log_printf(LOG_ERR, "Unable to listen to socket.\n"); - close(ccsd_fd); - exit(EXIT_FAILURE); - } - -restart: - - while (handle == NULL) - { - handle = cman_init(NULL); - - if (handle == NULL) { - - warn_user++; - - if (!(warn_user % 30)) - { - log_printf(LOG_ERR, "Unable to connect to cluster infrastructure after %d seconds.\n", - warn_user); - } - - sleep(1); - } - } - - if (ppid) { - kill(ppid, SIGTERM); - ppid = 0; - } - - cman_start_notification(handle, cman_callback); - - quorate = cman_is_quorate(handle); - - log_printf(LOG_INFO, "Initial status:: %s\n", (quorate)? "Quorate" : "Inquorate"); - - members = get_member_list(handle); - - while (1) - { - FD_ZERO(&rset); - cman_fd = cman_get_fd(handle); - - FD_SET(ccsd_fd, &rset); - FD_SET(cman_fd, &rset); - - max_fd = (ccsd_fd > cman_fd) ? ccsd_fd : cman_fd; - - log_printf(LOG_DEBUG, "Waiting for cluster event.\n"); - - if ((n = select((max_fd + 1), &rset, NULL, NULL, NULL)) < 0) { - log_printf(LOG_ERR, "Select failed"); - continue; - } - - log_printf(LOG_DEBUG, "There are %d cluster messages waiting.\n", n); - - while (n) - { - log_printf(LOG_DEBUG, "There are %d messages remaining.\n", n); - - n--; - - if (FD_ISSET(ccsd_fd, &rset)) { - handle_cluster_message(ccsd_fd); - } - - if (FD_ISSET(cman_fd, &rset)) { - if (handle_cluster_event(handle)) { - cman_finish(handle); - handle = NULL; - goto restart; - } - } - } - } - - CCSEXIT("cluster_communicator"); -} - - -int start_cluster_monitor_thread(void) { - int error = 0; - pthread_t thread; - - CCSENTER("start_cluster_monitor_thread"); - - pthread_mutex_init(&update_lock, NULL); - - error = pthread_create(&thread, NULL, (void *)cluster_communicator, NULL); - - if (error) { - log_printf(LOG_ERR, "Failed to create thread: %s\n", strerror(-error)); - goto fail; - } - - pthread_detach(thread); - -fail: - - CCSEXIT("start_cluster_monitor_thread"); - return error; -} - - -static member_list_t *get_member_list(cman_handle_t handle) -{ - int count = 0; - - member_list_t *list = NULL; - cman_node_t *nodes = NULL; - - do - { - - if (nodes != NULL) { - free(nodes); - } - - count = cman_get_node_count(handle); - - if (count <= 0) { - return NULL; - } - - if (list == NULL) { - list = malloc(sizeof(*list)); - } - - if (list == NULL) { - return NULL; - } - - nodes = malloc(sizeof(*nodes) * count); - - if (nodes == NULL) { - free(list); - return NULL; - } - - memset(list, 0, sizeof(*list)); - memset(nodes, 0, sizeof(*nodes) * count); - - cman_get_nodes(handle, count, &list->count, nodes); - - } while (list->count != count); - - list->count = count; - list->nodes = nodes; - - return list; -} - - -static void free_member_list(member_list_t *list) -{ - if (list != NULL) { - if (list->nodes != NULL) { - free(list->nodes); - } - free(list); - } -} - - -static char *member_id_to_name(member_list_t *list, int node) -{ - int i; - - for (i = 0; i < list->count; i++) { - if (list->nodes[i].cn_nodeid == node) { - return list->nodes[i].cn_name; - } - } - - return NULL; -} - - -static int member_addr_to_id(member_list_t *list, struct sockaddr *addr) -{ - int i; - - for (i = 0; i < list->count; i++) { - if (memcmp(&list->nodes[i].cn_address.cna_address, addr, - sizeof(struct sockaddr))) { - - return list->nodes[i].cn_nodeid; - } - } - - return -1; -} - - -static int select_retry(int max_fd, fd_set *rfds, fd_set *wfds, fd_set *xfds, - struct timeval *timeout) -{ - int rv; - - while (1) { - rv = select(max_fd, rfds, wfds, xfds, timeout); - if ((rv == -1) && (errno == EINTR)) { - /* return on EBADF/EINVAL/ENOMEM; continue on EINTR */ - continue; - } - return rv; - } -} - - -static ssize_t read_retry(int fd, void *buf, int count, struct timeval *timeout) -{ - int n, total = 0, remain = count, rv = 0; - fd_set rfds, xfds; - - while (total < count) - { - FD_ZERO(&rfds); - FD_SET(fd, &rfds); - FD_ZERO(&xfds); - FD_SET(fd, &xfds); - - /* - * Select on the socket, in case it closes while we're not - * looking... - */ - rv = select_retry(fd + 1, &rfds, NULL, &xfds, timeout); - if (rv == -1) { - return -1; - } - else if (rv == 0) { - errno = ETIMEDOUT; - return -1; - } - - if (FD_ISSET(fd, &xfds)) { - errno = EPIPE; - return -1; - } - - /* - * Attempt to read off the socket - */ - n = read(fd, buf + (off_t) total, remain); - - /* - * When we know our socket was select()ed and we receive 0 bytes - * when we read, the socket was closed. - */ - if ((n == 0) && (rv == 1)) { - errno = EPIPE; - return -1; - } - - if (n == -1) { - if ((errno == EAGAIN) || (errno == EINTR)) { - /* - * Not ready? Wait for data to become available - */ - continue; - } - - /* Other errors: EPIPE, EINVAL, etc */ - return -1; - } - - total += n; - remain -= n; - } - - return total; -} diff --git a/ccs/daemon/cluster_mgr.h b/ccs/daemon/cluster_mgr.h deleted file mode 100644 index b97b552..0000000 --- a/ccs/daemon/cluster_mgr.h +++ /dev/null @@ -1,6 +0,0 @@ -#ifndef __CLUSTER_MGR_DOT_H__ -#define __CLUSTER_MGR_DOT_H__ - -int start_cluster_monitor_thread(void); - -#endif /* __CLUSTER_MGR_DOT_H__ */ diff --git a/ccs/daemon/cnx_mgr.c b/ccs/daemon/cnx_mgr.c deleted file mode 100644 index 0dfa1cd..0000000 --- a/ccs/daemon/cnx_mgr.c +++ /dev/null @@ -1,1393 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "comm_headers.h" -#include "debug.h" -#include "misc.h" -#include "globals.h" - -/* Default descriptor expiration time, in seconds */ -#ifndef DEFAULT_EXPIRE -#define DEFAULT_EXPIRE 30 -#endif - -/* Maximum open connection count */ -#ifndef MAX_OPEN_CONNECTIONS -#define MAX_OPEN_CONNECTIONS 30 -#endif - -/* Conversion from descriptor to ocs index */ -#ifdef dindex -#undef dindex -#endif -#define dindex(x) ((x) % MAX_OPEN_CONNECTIONS) - -static inline void _cleanup_descriptor(int desc); - -extern int no_manager_opt; - -typedef struct open_connection_s { - char *oc_cwp; - char *oc_query; - open_doc_t *oc_odoc; - xmlXPathContextPtr oc_ctx; - int oc_index; - int oc_desc; - time_t oc_expire; -} open_connection_t; - -/* ATTENTION: need to lock on this if we start forking the daemon ** -** Also would need to create a shared memory area for open cnx's */ -static open_connection_t **ocs = NULL; -static int _descbase = 0; - -static int _update_config(char *location){ - int error = 0; - int v1=0, v2=0; - open_doc_t *tmp_odoc = NULL; - xmlDocPtr tmp_doc = NULL; - - CCSENTER("_update_config"); - - tmp_doc = xmlParseFile(location); - if(!tmp_doc){ - log_printf(LOG_ERR, "Unable to parse %s\n", location); - error = -EINVAL; - goto fail; - } else if((v2 = get_doc_version(tmp_doc)) < 0){ - log_printf(LOG_ERR, "Unable to get config_version from %s.\n", location); - error = v2; - goto fail; - } else if(master_doc && master_doc->od_doc){ - v1 = get_doc_version(master_doc->od_doc); - if(v1 >= v2){ - log_printf(LOG_ERR, "%s on-disk version is <= to in-memory version.\n", location); - log_printf(LOG_ERR, " On-disk version : %d\n", v2); - log_printf(LOG_ERR, " In-memory version : %d\n", v1); - error = -EPERM; - goto fail; - } - } else { - v1 = 0; - } - - if(!(tmp_odoc = malloc(sizeof(open_doc_t)))){ - error = -ENOMEM; - goto fail; - } - memset(tmp_odoc, 0, sizeof(open_doc_t)); - - tmp_odoc->od_doc = tmp_doc; - - log_printf(LOG_DEBUG, "There are %d references open on version %d of the config file.\n", - (master_doc)?master_doc->od_refs:0, v1); - if(master_doc && !master_doc->od_refs){ - log_printf(LOG_DEBUG, "Freeing version %d\n", v1); - xmlFreeDoc(master_doc->od_doc); - free(master_doc); - master_doc = tmp_odoc; - } else { - master_doc = tmp_odoc; - } - - log_printf(LOG_INFO, "Update of "DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE " complete (version %d -> %d).\n", v1, v2); - fail: - if(tmp_odoc != master_doc){ - free(tmp_odoc); - } - if(tmp_doc != master_doc->od_doc){ - xmlFreeDoc(tmp_doc); - } - - - CCSEXIT("_update_config"); - return error; -} - - -static int update_config(void){ - int error = 0; - CCSENTER("update_config"); - - /* If update_required is set, it means that there is still a pending ** - ** update. We need to pull this one in before doing anything else. */ - if(update_required){ - error = _update_config(DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE); - update_required = 0; - if(error){ - log_printf(LOG_ERR, "Previous update could not be completed.\n"); - goto fail; - } - } - - fail: - CCSEXIT("update_config"); - return error; -} - -/** - * broadcast_for_doc - * - * Returns: 0 on success, < 0 on error - */ -static int broadcast_for_doc(char *cluster_name, int blocking){ - int opt; - int error = 0; - int retry = 5; - int sfd = -1; - int trueint; - int v1, v2; - int write_to_disk = 0; - char *tmp_name = NULL; - struct sockaddr_storage addr, recv_addr; - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; - unsigned int len = sizeof(struct sockaddr_storage); - int addr_size = 0; - comm_header_t *ch = NULL; - char *bdoc = NULL; - fd_set rset; - struct timeval tv; - xmlDocPtr tmp_doc = NULL; - - CCSENTER("broadcast_for_doc"); - - try_again: - if(!master_doc){ - log_printf(LOG_ERR, "No master_doc!!!\n"); - exit(EXIT_FAILURE); - } - - if(quorate && !cluster_name){ - log_printf(LOG_ERR, "Node is part of quorate cluster, but the cluster name is unknown.\n"); - log_printf(LOG_ERR, " Unable to validate remote config files. Refusing connection.\n"); - error = -ECONNREFUSED; - goto fail; - } - - ch = malloc(sizeof(comm_header_t)); - if(!ch){ - error = -ENOMEM; - goto fail; - } - memset(ch, 0, sizeof(comm_header_t)); - - if(IPv6 && (sfd = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP)) <0){ - log_printf(LOG_ERR, "Unable to create IPv6 socket"); - error = -errno; - goto fail; - } - - if(!IPv6 && ((sfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)){ - log_printf(LOG_ERR, "Unable to create socket for broadcast"); - error = -errno; - goto fail; - } - - memset(&addr, 0, sizeof(struct sockaddr_storage)); - - trueint = 1; - if(IPv6){ - struct ipv6_mreq mreq; - - addr6->sin6_family = AF_INET6; - addr6->sin6_port = htons(backend_port); - - if(!multicast_address || !strcmp(multicast_address, "default")){ - log_printf(LOG_DEBUG, "Trying IPv6 multicast (default).\n"); - if(inet_pton(AF_INET6, "ff02::3:1", &(addr6->sin6_addr)) <= 0){ - log_printf(LOG_ERR, "Unable to convert multicast address"); - error = -errno; - goto fail; - } - } else { - log_printf(LOG_DEBUG, "Trying IPv6 multicast (%s).\n", multicast_address); - if(inet_pton(AF_INET6, multicast_address, &(addr6->sin6_addr)) <= 0){ - log_printf(LOG_ERR, "Unable to convert multicast address"); - error = -errno; - goto fail; - } - } - - memcpy(&mreq.ipv6mr_multiaddr, &(addr6->sin6_addr), sizeof(struct in6_addr)); - mreq.ipv6mr_interface = 0; - opt = 0; - - if(setsockopt(sfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, - &opt, sizeof(opt)) < 0){ - log_printf(LOG_ERR, "Unable to %s loopback.\n", opt?"SET":"UNSET"); - error = -errno; - goto fail; - } - } else { - addr4->sin_family = AF_INET; - addr4->sin_port = htons(backend_port); - if(!multicast_address){ - log_printf(LOG_DEBUG, "Trying IPv4 broadcast.\n"); - - addr4->sin_addr.s_addr = INADDR_BROADCAST; - if((error = setsockopt(sfd, SOL_SOCKET, SO_BROADCAST, &trueint, sizeof(int)))){ - log_printf(LOG_ERR, "Unable to set socket options"); - error = -errno; - goto fail; - } else { - log_printf(LOG_DEBUG, " Broadcast enabled.\n"); - } - } else { - if(!strcmp(multicast_address, "default")){ - log_printf(LOG_DEBUG, "Trying IPv4 multicast (default).\n"); - if(inet_pton(AF_INET, "224.0.2.5", &(addr4->sin_addr)) <= 0){ - log_printf(LOG_ERR, "Unable to convert multicast address"); - error = -errno; - goto fail; - } - } else { - log_printf(LOG_DEBUG, "Trying IPv4 multicast (%s).\n", multicast_address); - if(inet_pton(AF_INET, multicast_address, &(addr4->sin_addr)) <= 0){ - log_printf(LOG_ERR, "Unable to convert multicast address"); - error = -errno; - goto fail; - } - } - opt = 0; - setsockopt(sfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt)); - if(setsockopt(sfd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0){ - log_printf(LOG_ERR, "Unable to set multicast threshold.\n"); - } - } - } - addr_size = IPv6? sizeof(struct sockaddr_in6):sizeof(struct sockaddr_in); - - FD_ZERO(&rset); - - do { - ch->comm_type = COMM_BROADCAST; - - log_printf(LOG_DEBUG, "Sending broadcast.\n"); - swab_header(ch); - - if(sendto(sfd, (char *)ch, sizeof(comm_header_t), 0, - (struct sockaddr *)&addr, addr_size) < 0){ - log_printf(LOG_ERR, "Unable to perform sendto"); - if(retry > 0){ - retry--; - close(sfd); - free(ch); - sleep(2); - goto try_again; - } else { - error = -errno; - goto fail; - } - } - - srandom(getpid()); - FD_SET(sfd, &rset); - tv.tv_sec = 0; - - tv.tv_usec = 250000 + (random()%500000); -#if defined(__sparc__) - log_printf(LOG_DEBUG, "Select waiting %d usec\n", tv.tv_usec); -#else - log_printf(LOG_DEBUG, "Select waiting %ld usec\n", tv.tv_usec); -#endif - while((error = select(sfd+1, &rset, NULL,NULL, &tv))){ - log_printf(LOG_DEBUG, "Select returns %d\n", error); - if(error < 0){ - log_printf(LOG_ERR, "Select failed"); - error = -errno; - goto fail; - } - if(error){ - log_printf(LOG_DEBUG, "Checking broadcast response.\n"); - error = 0; - recvfrom(sfd, (char *)ch, sizeof(comm_header_t), MSG_PEEK, - (struct sockaddr *)&recv_addr, (socklen_t *)&len); - swab_header(ch); - if(!ch->comm_payload_size || ch->comm_error){ - /* clean out this reply by not using MSG_PEEK */ - recvfrom(sfd, (char *)ch, sizeof(comm_header_t), 0, - (struct sockaddr *)&recv_addr, (socklen_t *)&len); - error = -ENODATA; - FD_SET(sfd, &rset); - goto reset_timer; - } - bdoc = malloc(ch->comm_payload_size + sizeof(comm_header_t)); - if(!bdoc){ - error = -ENOMEM; - goto fail; - } - memset(bdoc, 0, ch->comm_payload_size + sizeof(comm_header_t)); - /* ATTENTION -- potential for incomplete package */ - recvfrom(sfd, bdoc, ch->comm_payload_size + sizeof(comm_header_t), - 0, (struct sockaddr *)&recv_addr, &len); - tmp_doc = xmlParseMemory(bdoc+sizeof(comm_header_t), - ch->comm_payload_size); - if(!tmp_doc){ - log_printf(LOG_ERR, "Unable to parse remote configuration.\n"); - free(bdoc); bdoc = NULL; - goto reset_timer; - } - - tmp_name = get_cluster_name(tmp_doc); - log_printf(LOG_DEBUG, " Given cluster name = %s\n", cluster_name); - log_printf(LOG_DEBUG, " Remote cluster name= %s\n", tmp_name); - if(!tmp_name){ - log_printf(LOG_ERR, "Unable to find cluster name in remote configuration.\n"); - free(bdoc); bdoc = NULL; - xmlFreeDoc(tmp_doc); tmp_doc = NULL; - goto reset_timer; - } else if(cluster_name && strcmp(cluster_name, tmp_name)){ - log_printf(LOG_DEBUG, "Remote and local configuration have different cluster names.\n"); - log_printf(LOG_DEBUG, "Skipping...\n"); - free(tmp_name); tmp_name = NULL; - free(bdoc); bdoc = NULL; - xmlFreeDoc(tmp_doc); tmp_doc = NULL; - goto reset_timer; - } - free(tmp_name); tmp_name = NULL; - if(!master_doc->od_doc){ - if((v2 = get_doc_version(tmp_doc)) >= 0){ - log_printf(LOG_INFO, "Remote configuration copy (version = %d) found.\n", v2); - master_doc->od_doc = tmp_doc; - tmp_doc = NULL; - write_to_disk = 1; - } - } else { - if(((v1 = get_doc_version(master_doc->od_doc)) >= 0) && - ((v2 = get_doc_version(tmp_doc)) >= 0)){ - if(ch->comm_flags & COMM_BROADCAST_FROM_QUORATE){ - log_printf(LOG_INFO, "Remote configuration copy is from quorate node.\n"); - log_printf(LOG_INFO, " Local version # : %d\n", v1); - log_printf(LOG_INFO, " Remote version #: %d\n", v2); - if(v1 != v2){ - log_printf(LOG_INFO, "Switching to remote copy.\n"); - } - if(master_doc->od_refs){ - open_doc_t *tmp_odoc; - if(!(tmp_odoc = malloc(sizeof(open_doc_t)))){ - error = -ENOMEM; - goto fail; - } - memset(tmp_odoc, 0, sizeof(open_doc_t)); - tmp_odoc->od_doc = tmp_doc; - master_doc = tmp_odoc; - } else { - xmlFreeDoc(master_doc->od_doc); - master_doc->od_doc = tmp_doc; - } - tmp_doc = NULL; - write_to_disk = 1; - goto out; - } else if(v2 > v1){ - log_printf(LOG_INFO, "Remote configuration copy is newer than local copy.\n"); - log_printf(LOG_INFO, " Local version # : %d\n", v1); - log_printf(LOG_INFO, " Remote version #: %d\n", v2); - if(master_doc->od_refs){ - open_doc_t *tmp_odoc; - if(!(tmp_odoc = malloc(sizeof(open_doc_t)))){ - error = -ENOMEM; - goto fail; - } - memset(tmp_odoc, 0, sizeof(open_doc_t)); - tmp_odoc->od_doc = tmp_doc; - master_doc = tmp_odoc; - } else { - xmlFreeDoc(master_doc->od_doc); - master_doc->od_doc = tmp_doc; - } - tmp_doc = NULL; - write_to_disk = 1; - } - } else { - xmlFreeDoc(tmp_doc); - tmp_doc = NULL; - } - } - free(bdoc); bdoc = NULL; - } - FD_SET(sfd, &rset); - /* select will alter the timeout */ - reset_timer: - tv.tv_sec = 0; - tv.tv_usec = 250000 + (random()%500000); -#if defined(__sparc__) - log_printf(LOG_DEBUG, "Select waiting %d usec\n", tv.tv_usec); -#else - log_printf(LOG_DEBUG, "Select waiting %ld usec\n", tv.tv_usec); -#endif - } - } while(blocking && !master_doc); - out: - if(error){ - goto fail; - } - - if(write_to_disk){ - struct stat stat_buf; - mode_t old_mode; - FILE *f; - /* We did not have a copy available or we found a newer one, so write it out */ - - /* ATTENTION -- its bad if we fail here, because we have an in-memory version ** - ** but it has not been written to disk....................................... */ - if(stat(DEFAULT_CONFIG_DIR, &stat_buf)){ - if(mkdir(DEFAULT_CONFIG_DIR, S_IRWXU | S_IRWXG)){ - log_printf(LOG_ERR, "Unable to create directory " DEFAULT_CONFIG_DIR); - error = -errno; - goto fail; - } - } else if(!S_ISDIR(stat_buf.st_mode)){ - log_printf(LOG_ERR, DEFAULT_CONFIG_DIR " is not a directory.\n"); - error = -ENOTDIR; - goto fail; - } - - old_mode = umask(026); - f = fopen(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE, "w"); - umask(old_mode); - if(!f){ - log_printf(LOG_ERR, "Unable to open " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); - error = -errno; - goto fail; - } - if(xmlDocDump(f, master_doc->od_doc) < 0){ - error = -EIO; - fclose(f); - goto fail; - } - fclose(f); - } - - fail: - if(ch) free(ch); - if(bdoc) free(bdoc); - if(tmp_doc) xmlFreeDoc(tmp_doc); - if(sfd >= 0) close(sfd); - CCSEXIT("broadcast_for_doc"); - return error; -} - -/** - * process_connect: process a connect request - * @afd: accepted socket connection - * @cluster_name: optional cluster name - * - * Returns: 0 on success, < 0 on error - */ -static int process_connect(comm_header_t *ch, char *cluster_name){ - int i=0, error = 0; - int bcast_needed = 0; - char *tmp_name = NULL; - time_t now; - - CCSENTER("process_connect"); - - ch->comm_payload_size = 0; - - log_printf(LOG_DEBUG, "Given cluster name is = %s\n", cluster_name); - - if(!ocs){ - /* this will never be freed - unless exit */ - ocs = malloc(sizeof(open_connection_t *)*MAX_OPEN_CONNECTIONS); - if(!ocs){ - error = -ENOMEM; - goto fail; - } - memset(ocs, 0, sizeof(open_connection_t *)*MAX_OPEN_CONNECTIONS); - } - - if(!quorate && !(ch->comm_flags & COMM_CONNECT_FORCE)){ - log_printf(LOG_INFO, "Cluster is not quorate. Refusing connection.\n"); - error = -ECONNREFUSED; - goto fail; - } - - if(!master_doc){ - /* ATTENTION -- signal could come at any time. It may be better to ** - ** malloc to different var, then copy to master_doc when done */ - master_doc = malloc(sizeof(open_doc_t)); - if(!master_doc){ - error = -ENOMEM; - goto fail; - } - memset(master_doc, 0, sizeof(open_doc_t)); - } - - if(!master_doc->od_doc){ - master_doc->od_doc = xmlParseFile(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); - if(!master_doc->od_doc){ - log_printf(LOG_INFO, "Unable to parse " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "\n"); - log_printf(LOG_INFO, "Searching cluster for valid copy.\n"); - } else if((error = get_doc_version(master_doc->od_doc)) < 0){ - log_printf(LOG_ERR, "Unable to get config_version from " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); - log_printf(LOG_ERR, "Discarding data and searching for valid copy.\n"); - xmlFreeDoc(master_doc->od_doc); - master_doc->od_doc = NULL; - } else if(!(tmp_name = get_cluster_name(master_doc->od_doc))){ - log_printf(LOG_ERR, "Unable to get cluster name from " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); - log_printf(LOG_ERR, "Discarding data and searching for valid copy.\n"); - xmlFreeDoc(master_doc->od_doc); - master_doc->od_doc = NULL; - } else if(cluster_name && strcmp(cluster_name, tmp_name)){ - log_printf(LOG_ERR, "Given cluster name does not match local " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); - log_printf(LOG_ERR, "Discarding data and searching for matching copy.\n"); - xmlFreeDoc(master_doc->od_doc); - master_doc->od_doc = NULL; - free(tmp_name); tmp_name = NULL; - } else if(set_ccs_logging(master_doc->od_doc) < 0){ - log_printf(LOG_ERR, "Unable to set logging parameters.\n"); - } else { /* Either the names match, or a name wasn't specified. */ - log_printf(LOG_INFO, DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE " (cluster name = %s, version = %d) found.\n", - tmp_name, error); - /* We must check with the others to make sure this is valid. */ - } - if (!no_manager_opt) - bcast_needed = 1; - error = 0; - } else { - tmp_name = get_cluster_name(master_doc->od_doc); - - /* ATTENTION -- if not quorate, consider swapping out in-memory config ** - ** for the config of the name specified............................... */ - - if(cluster_name && strcmp(cluster_name, tmp_name)){ - log_printf(LOG_ERR, "Request for configuration with cluster name, %s\n", cluster_name); - log_printf(LOG_ERR, " However, a configuration with cluster name, %s, is already loaded.\n", - tmp_name); - error = -EINVAL; - goto fail; - } - if(!quorate){ - bcast_needed = 1; - } - } - - if(cluster_name && !tmp_name){ - tmp_name = strdup(cluster_name); - if(!tmp_name){ - error = -ENOMEM; - goto fail; - } - } - - log_printf(LOG_DEBUG, "Blocking is %s.\n", - (ch->comm_flags & COMM_CONNECT_BLOCKING)? "SET": "UNSET"); - log_printf(LOG_DEBUG, "Flags = 0x%x\n", ch->comm_flags); - - /* Need to broadcast regardless (unless quorate) to check version # */ - if(bcast_needed){ - log_printf(LOG_DEBUG, "Broadcast is neccessary.\n"); - } - if(bcast_needed && - (error = broadcast_for_doc(tmp_name, ch->comm_flags & COMM_CONNECT_BLOCKING)) && - !master_doc->od_doc){ - log_printf(LOG_ERR, "Broadcast for config file failed: %s\n", strerror(-error)); - goto fail; - } - error = 0; - - if(!master_doc || !master_doc->od_doc){ - log_printf(LOG_ERR, "The appropriate config file could not be loaded.\n"); - error = -ENODATA; - goto fail; - } - - if(update_required){ - log_printf(LOG_DEBUG, "Update is required.\n"); - if((error = update_config())){ - log_printf(LOG_ERR, "Failed to update config file, required by cluster.\n"); - /* ATTENTION -- remove all open_doc_t's ? */ - goto fail; - } - } - - /* Locate the connection descriptor */ - now = time(NULL); - for(i=0; i < MAX_OPEN_CONNECTIONS; i++){ - if (!ocs[i]) - continue; - if (now >= ocs[i]->oc_expire) { - log_printf(LOG_DEBUG, "Recycling connection descriptor %d: Expired\n", - ocs[i]->oc_desc ); - _cleanup_descriptor(i); - } - } - - for(i=0; i < MAX_OPEN_CONNECTIONS; i++){ - if(!ocs[i]) - break; - } - - if(i >= MAX_OPEN_CONNECTIONS){ - error = -EAGAIN; - goto fail; - } - - ocs[i] = (open_connection_t *)malloc(sizeof(open_connection_t)); - if(!ocs[i]){ - error = -ENOMEM; - goto fail; - } - - memset(ocs[i], 0, sizeof(open_connection_t)); - - master_doc->od_refs++; - ocs[i]->oc_odoc = master_doc; - ocs[i]->oc_ctx = xmlXPathNewContext(ocs[i]->oc_odoc->od_doc); - ocs[i]->oc_expire = now + DEFAULT_EXPIRE; - - /* using error as a temp var */ - error = i + _descbase++ * MAX_OPEN_CONNECTIONS; - if (error > INT_MAX || error < 0) { - error = i; - _descbase = 0; - } - ocs[i]->oc_desc = error; - - /* reset error */ - error = 0; - - if(!ocs[i]->oc_ctx){ - ocs[i]->oc_odoc->od_refs--; - free(ocs[i]); - log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); - error = -EIO; /* ATTENTION -- what should this be? */ - goto fail; - } - - /* return desc to requestor */ - - fail: - if(master_doc && master_doc->od_doc == NULL){ - free(master_doc); - master_doc = NULL; - } - if(tmp_name){ - free(tmp_name); - } - if(error){ - ch->comm_error = error; - } else { - ch->comm_desc = ocs[i]->oc_desc; - } - CCSEXIT("process_connect"); - return error; -} - - -static inline void -_cleanup_descriptor(int desc) -{ - open_doc_t *tmp_odoc; - - if(ocs[desc]->oc_ctx){ - xmlXPathFreeContext(ocs[desc]->oc_ctx); - } - if(ocs[desc]->oc_cwp){ - free(ocs[desc]->oc_cwp); - } - if(ocs[desc]->oc_query){ - free(ocs[desc]->oc_query); - } - tmp_odoc = ocs[desc]->oc_odoc; - if(tmp_odoc->od_refs < 1){ - log_printf(LOG_ERR, "Number of references on an open doc should never be < 1.\n"); - log_printf(LOG_ERR, "This is a fatal error. Exiting...\n"); - exit(EXIT_FAILURE); - } - if(tmp_odoc != master_doc && tmp_odoc->od_refs == 1){ - log_printf(LOG_DEBUG, "No more references on version %d of config file, freeing...\n", - get_doc_version(tmp_odoc->od_doc)); - xmlFreeDoc(tmp_odoc->od_doc); - free(tmp_odoc); - } else { - tmp_odoc->od_refs--; - } - - free(ocs[desc]); - ocs[desc] = NULL; -} - - -/** - * process_disconnect: close an open session - * @afd: accepted socket connection - * @desc: descriptor describing the open connection - * - * This fuction frees all memory associated with an open session. - * - * Returns: 0 on success, < 0 on error - */ -static int process_disconnect(comm_header_t *ch){ - int desc = dindex(ch->comm_desc); - int error=0; - CCSENTER("process_disconnect"); - - ch->comm_payload_size = 0; - - if(desc < 0){ - log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", desc); - log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); - error = -EBADR; - goto fail; - } - - if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ - /* send failure to requestor ? */ - log_printf(LOG_ERR, "Attempt to close an unopened CCS descriptor (%d).\n", - ch->comm_desc); - - error = -EBADR; - goto fail; - } else { - _cleanup_descriptor(desc); - } - - fail: - if(error){ - ch->comm_error = error; - } else { - ch->comm_desc = -1; - } - CCSEXIT("process_disconnect"); - return error; -} - -/* - * _process_get - * @ch - * @payload - * - * This function runs the xml query. If the query is different from the - * previous query, it will always fill the payload with the first match. - * If the current query and the previous query are the same, it fills the - * payload with next match. If the last of all possible matches was - * returned by the previous query and the current query is the same, - * the payload will be filled with the 1st match and 1 will be returned - * as the result of the function. - * - * Returns: -EXXX on error, 1 if restarting list, 0 otherwise - */ -static int _process_get(comm_header_t *ch, char **payload){ - int error = 0, desc = dindex(ch->comm_desc); - xmlXPathObjectPtr obj = NULL; - char *query = NULL; - - CCSENTER("_process_get"); - if(!ch->comm_payload_size){ - log_printf(LOG_ERR, "process_get: payload size is zero.\n"); - error = -EINVAL; - goto fail; - } - - if(ch->comm_desc < 0){ - log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); - log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); - error = -EBADR; - goto fail; - } - - if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ - log_printf(LOG_ERR, "process_get: Invalid connection descriptor received.\n"); - error = -EBADR; - goto fail; - } - - if(ocs[desc]->oc_query && !strcmp(*payload,ocs[desc]->oc_query)){ - ocs[desc]->oc_index++; - log_printf(LOG_DEBUG, "Index = %d\n",ocs[desc]->oc_index); - log_printf(LOG_DEBUG, " Query = %s\n", *payload); - } else { - log_printf(LOG_DEBUG, "Index reset (new query).\n"); - log_printf(LOG_DEBUG, " Query = %s\n", *payload); - ocs[desc]->oc_index = 0; - if(ocs[desc]->oc_query){ - free(ocs[desc]->oc_query); - } - ocs[desc]->oc_query = (char *)strdup(*payload); - } - - /* ATTENTION -- should path expansion go before index inc ? */ - if(((ch->comm_payload_size > 1) && - ((*payload)[0] == '/')) || - !ocs[desc]->oc_cwp){ - log_printf(LOG_DEBUG, "Query involves absolute path or cwp is not set.\n"); - query = (char *)strdup(*payload); - if(!query){ - error = -ENOMEM; - goto fail; - } - } else { - /* +2 because of NULL and '/' character */ - log_printf(LOG_DEBUG, "Query involves relative path.\n"); - query = malloc(strlen(*payload)+strlen(ocs[desc]->oc_cwp)+2); - if(!query){ - error = -ENOMEM; - goto fail; - } - sprintf(query, "%s/%s", ocs[desc]->oc_cwp, *payload); - } - - /* Bump expiration time */ - ocs[desc]->oc_expire = time(NULL) + DEFAULT_EXPIRE; - - obj = xmlXPathEvalExpression((xmlChar *)query, ocs[desc]->oc_ctx); - if(obj){ - log_printf(LOG_DEBUG, "Obj type = %d (%s)\n", obj->type, (obj->type == 1)?"XPATH_NODESET":""); - log_printf(LOG_DEBUG, "Number of matches: %d\n", (obj->nodesetval)?obj->nodesetval->nodeNr:0); - if(obj->nodesetval && (obj->nodesetval->nodeNr > 0) ){ - xmlNodePtr node; - int size=0; - int nnv=0; /* name 'n' value */ - - if(ocs[desc]->oc_index >= obj->nodesetval->nodeNr){ - ocs[desc]->oc_index = 0; - error = 1; - log_printf(LOG_DEBUG, "Index reset to zero (end of list).\n"); - } - - node = obj->nodesetval->nodeTab[ocs[desc]->oc_index]; - - log_printf(LOG_DEBUG, "Node (%s) type = %d (%s)\n", node->name, node->type, - (node->type == 1)? "XML_ELEMENT_NODE": - (node->type == 2)? "XML_ATTRIBUTE_NODE":""); - - if(!node) { - log_printf(LOG_DEBUG, "No content found.\n"); - error = -ENODATA; - goto fail; - } - - if(((node->type == XML_ATTRIBUTE_NODE) && strstr(query, "@*")) || - ((node->type == XML_ELEMENT_NODE) && strstr(query, "child::*"))){ - /* add on the trailing NULL and the '=' separator for a list of attrs - or an element node + CDATA*/ - if (node->children && node->children->content) - size = strlen((char *)node->children->content) + - strlen((char *)node->name)+2; - else - size = strlen((char *)node->name)+2; - nnv= 1; - } else { - if (node->children && node->children->content) { - size = strlen((char *)node->children->content)+1; - } else { - error = -ENODATA; - goto fail; - } - } - - if(size <= ch->comm_payload_size){ /* do we already have enough space? */ - log_printf(LOG_DEBUG, "No extra space needed.\n"); - if(nnv){ - sprintf(*payload, "%s=%s", node->name, node->children ? - (char *)node->children->content:""); - } else { - sprintf(*payload, "%s", node->children ? node->children->content : - node->name); - } - - } else { - log_printf(LOG_DEBUG, "Extra space needed.\n"); - free(*payload); - *payload = (char *)malloc(size); - if(!*payload){ - error = -ENOMEM; - goto fail; - } - if(nnv){ - sprintf(*payload, "%s=%s", node->name, node->children ? - (char *)node->children->content:""); - } else { - sprintf(*payload, "%s", node->children ? node->children->content : - node->name); - } - } - log_printf(LOG_DEBUG, "Query results:: %s\n", *payload); - ch->comm_payload_size = size; - } else { - log_printf(LOG_DEBUG, "No nodes found.\n"); - ch->comm_payload_size = 0; - error = -ENODATA; - goto fail; - } - } else { - log_printf(LOG_ERR, "Error: unable to evaluate xpath query \"%s\"\n", *payload); - error = -EINVAL; - goto fail; - } - - fail: - if(obj){ - xmlXPathFreeObject(obj); - } - if(error < 0){ - ch->comm_error = error; - ch->comm_payload_size = 0; - } - if(query) { free(query); } - CCSEXIT("_process_get"); - return error; -} - -static int process_get(comm_header_t *ch, char **payload){ - int error; - CCSENTER("process_get"); - - error = _process_get(ch, payload); - - CCSEXIT("process_get"); - return (error < 0)? error: 0; -} - -static int process_get_list(comm_header_t *ch, char **payload){ - int error; - CCSENTER("process_get_list"); - - error = _process_get(ch, payload); - if(error){ - ch->comm_payload_size = 0; - if(ocs && ocs[dindex(ch->comm_desc)]) - ocs[dindex(ch->comm_desc)]->oc_index = -1; - } - - CCSEXIT("process_get_list"); - return (error < 0)? error: 0; -} - -static int process_set(comm_header_t *ch, char *payload){ - int error = 0; - int desc = dindex(ch->comm_desc); - - CCSENTER("process_set"); - if(!ch->comm_payload_size){ - log_printf(LOG_ERR, "process_set: payload size is zero.\n"); - error = -EINVAL; - goto fail; - } - - if(ch->comm_desc < 0){ - log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); - log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); - error = -EBADR; - goto fail; - } - - if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ - log_printf(LOG_ERR, "process_set: Invalid connection descriptor received.\n"); - error = -EBADR; - goto fail; - } - - error = -ENOSYS; - - fail: - free(payload); - ch->comm_payload_size = 0; - if(error){ - ch->comm_error = error; - } - CCSEXIT("process_set"); - return error; -} - - -static int process_get_state(comm_header_t *ch, char **payload){ - int error = 0, desc = dindex(ch->comm_desc); - char *load = NULL; - - CCSENTER("process_get_state"); - if(ch->comm_payload_size){ - log_printf(LOG_ERR, "process_get_state: payload size is nonzero.\n"); - error = -EINVAL; - goto fail; - } - - if(ch->comm_desc < 0){ - log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); - log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); - error = -EBADR; - goto fail; - } - - if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ - log_printf(LOG_ERR, "process_get_state: Invalid connection descriptor received.\n"); - error = -EBADR; - goto fail; - } - - if(ocs[desc]->oc_cwp && ocs[desc]->oc_query){ - int size = strlen(ocs[desc]->oc_cwp) + - strlen(ocs[desc]->oc_query) + 2; - log_printf(LOG_DEBUG, "Both cwp and query are set.\n"); - load = malloc(size); - if(!load){ - error = -ENOMEM; - goto fail; - } - strcpy(load, ocs[desc]->oc_cwp); - strcpy(load+strlen(ocs[desc]->oc_cwp)+1, ocs[desc]->oc_query); - ch->comm_payload_size = size; - } else if(ocs[desc]->oc_cwp){ - log_printf(LOG_DEBUG, "Only cwp is set.\n"); - load = (char *)strdup(ocs[desc]->oc_cwp); - if(!load){ - error = -ENOMEM; - goto fail; - } - ch->comm_payload_size = strlen(load)+1; - } else if(ocs[desc]->oc_query){ - int size = strlen(ocs[desc]->oc_query) + 2; - log_printf(LOG_DEBUG, "Only query is set.\n"); - load = malloc(size); - if(!load){ - error = -ENOMEM; - goto fail; - } - memset(load, 0, size); - strcpy(load+1, ocs[desc]->oc_query); - ch->comm_payload_size = size; - } - - ocs[desc]->oc_expire = time(NULL) + DEFAULT_EXPIRE; - *payload = load; - - fail: - if(error){ - if(load) { free(load); } - ch->comm_error = error; - ch->comm_payload_size = 0; - } - CCSEXIT("process_get_state"); - return error; -} - - -static int process_set_state(comm_header_t *ch, char *payload){ - int error = 0, desc = dindex(ch->comm_desc); - - CCSENTER("process_set_state"); - if(!ch->comm_payload_size){ - log_printf(LOG_ERR, "process_set_state: payload size is zero.\n"); - error = -EINVAL; - goto fail; - } - - if(ch->comm_desc < 0){ - log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); - log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); - error = -EBADR; - goto fail; - } - - if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ - log_printf(LOG_ERR, "process_set_state: Invalid connection descriptor received.\n"); - error = -EBADR; - goto fail; - } - - if(ocs[desc]->oc_cwp){ - free(ocs[desc]->oc_cwp); - ocs[desc]->oc_cwp = NULL; - } - - if((ch->comm_flags & COMM_SET_STATE_RESET_QUERY) && ocs[desc]->oc_query){ - free(ocs[desc]->oc_query); - ocs[desc]->oc_query = NULL; - } - - ocs[desc]->oc_expire = time(NULL) + DEFAULT_EXPIRE; - ocs[desc]->oc_cwp = (char *)strdup(payload); - - fail: - ch->comm_payload_size = 0; - if(error){ - ch->comm_error = error; - } - - CCSEXIT("process_set_state"); - return error; -} - - -/** - * process_request - * @afd - * - * This function operates as a switch, passing the request to the - * appropriate function. - * - * Returns: 0 on success, < 0 on error - */ -int process_request(int afd){ - int error=0; - comm_header_t *ch = NULL, *tmp_ch; - char *payload = NULL; - - CCSENTER("process_request"); - - if(!(ch = (comm_header_t *)malloc(sizeof(comm_header_t)))){ - error = -ENOMEM; - goto fail; - } - - error = read(afd, ch, sizeof(comm_header_t)); - if(error < 0){ - log_printf(LOG_ERR, "Unable to read comm_header_t"); - goto fail; - } else if(error < sizeof(comm_header_t)){ - log_printf(LOG_ERR, "Unable to read complete comm_header_t.\n"); - error = -EBADE; - goto fail; - } - - if(ch->comm_payload_size){ - if(!(payload = (char *)malloc(ch->comm_payload_size))){ - error = -ENOMEM; - goto fail; - } - error = read(afd, payload, ch->comm_payload_size); - if(error < 0){ - log_printf(LOG_ERR, "Unable to read payload"); - goto fail; - } else if(error < ch->comm_payload_size){ - log_printf(LOG_ERR, "Unable to read complete payload.\n"); - error = -EBADE; - goto fail; - } - } - - switch(ch->comm_type){ - case COMM_CONNECT: - if((error = process_connect(ch, payload)) < 0){ - log_printf(LOG_ERR, "Error while processing connect: %s\n", strerror(-error)); - goto fail; - } - break; - case COMM_DISCONNECT: - if((error = process_disconnect(ch)) < 0){ - log_printf(LOG_ERR, "Error while processing disconnect: %s\n", strerror(-error)); - goto fail; - } - break; - case COMM_GET: - if((error = process_get(ch, &payload)) < 0){ - if(error != -ENODATA){ - log_printf(LOG_ERR, "Error while processing get: %s\n", strerror(-error)); - } - goto fail; - } - break; - case COMM_GET_LIST: - if((error = process_get_list(ch, &payload)) < 0){ - if(error != -ENODATA){ - log_printf(LOG_ERR, "Error while processing get: %s\n", strerror(-error)); - } - goto fail; - } - break; - case COMM_SET: - if((error = process_set(ch, payload)) < 0){ - log_printf(LOG_ERR, "Error while processing set: %s\n", strerror(-error)); - goto fail; - } - break; - case COMM_GET_STATE: - if((error = process_get_state(ch, &payload)) < 0){ - log_printf(LOG_ERR, "Error while processing get_state: %s\n", strerror(-error)); - goto fail; - } - break; - case COMM_SET_STATE: - if((error = process_set_state(ch, payload)) < 0){ - log_printf(LOG_ERR, "Error while processing set_state: %s\n", strerror(-error)); - goto fail; - } - break; - default: - log_printf(LOG_ERR, "Unknown connection request received.\n"); - error = -EINVAL; - ch->comm_error = error; - ch->comm_payload_size = 0; - } - - if(ch->comm_payload_size){ - log_printf(LOG_DEBUG, "Reallocating transfer buffer.\n"); - tmp_ch = (comm_header_t *) - realloc(ch,sizeof(comm_header_t)+ch->comm_payload_size); - - if(tmp_ch) { ch = tmp_ch; } else { - log_printf(LOG_ERR, "Not enough memory to complete request.\n"); - error = -ENOMEM; - goto fail; - } - memcpy((char *)ch+sizeof(comm_header_t), payload, ch->comm_payload_size); - } - - fail: - error = write(afd, ch, sizeof(comm_header_t)+ch->comm_payload_size); - if(error < 0){ - if (errno == EINTR) - goto fail; - if (errno == EPIPE) { - error = 0; - } else { - log_printf(LOG_ERR, "Unable to write package back to sender"); - } - } else if(error < (sizeof(comm_header_t)+ch->comm_payload_size)){ - log_printf(LOG_ERR, "Unable to write complete package.\n"); - error = -EBADE; - goto fail; - } else { - error = 0; - } - - if(ch){ free(ch); } - if(payload){ free(payload); } - - CCSEXIT("process_request"); - return error; -} - - -/** - * process_broadcast - * @sfd: the UDP socket - * - * Returns: 0 on success, < 0 on failure - */ -int process_broadcast(int sfd){ - int error = 0; - comm_header_t *ch = NULL; - xmlChar *payload = NULL; - char *buffer = NULL; - struct sockaddr_storage addr; - unsigned int len = sizeof(struct sockaddr_storage); /* value/result for recvfrom */ - int sendlen; - int discard = 0; - - CCSENTER("process_broadcast"); - - ch = malloc(sizeof(comm_header_t)); - if(!ch){ - error = -ENOMEM; - goto fail; - } - memset(ch, 0, sizeof(comm_header_t)); - memset(&addr, 0, sizeof(struct sockaddr_storage)); /* just to make sure */ - - log_printf(LOG_DEBUG, "Waiting to receive broadcast request.\n"); - if(recvfrom(sfd, ch, sizeof(comm_header_t), 0, (struct sockaddr *)&addr, &len) < 0){ - log_printf(LOG_ERR, "Unable to perform recvfrom"); - error = -errno; - goto fail; - } - swab_header(ch); - - if(ch->comm_type != COMM_BROADCAST){ - /* Either someone is pinging this port, or there is an older version ** - ** of ccs trying to get bcast response. Either way, we should not ** - ** respond to them.................................................. */ - log_printf(LOG_DEBUG, "Received invalid request on broadcast port. %x\n",ch->comm_type); - error = -EINVAL; - goto fail; - } - - /* need to ignore my own broadcasts */ - - if(ch->comm_payload_size){ - /* cluster name was sent, need to read it */ - } - - if(!master_doc){ - discard = 1; - log_printf(LOG_DEBUG, "master_doc not loaded. Attempting to load it.\n"); - if(!(master_doc = malloc(sizeof(open_doc_t)))){ - error = -ENOMEM; - goto fail; - } - memset(master_doc, 0, sizeof(open_doc_t)); - master_doc->od_doc = xmlParseFile(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); - if(!master_doc->od_doc){ - free(master_doc); - master_doc = NULL; - log_printf(LOG_ERR, "Unable to parse " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); - error = -ENODATA; - goto fail; - } - log_printf(LOG_DEBUG, "master_doc found and loaded.\n"); - } else if(update_required){ - log_printf(LOG_DEBUG, "Update is required.\n"); - if((error = update_config())){ - log_printf(LOG_ERR, "Failed to update config file, required by cluster.\n"); - /* ATTENTION -- remove all open_doc_t's ? */ - goto fail; - } - } - - /* allocates space for the payload */ - xmlDocDumpFormatMemory(master_doc->od_doc, - &payload, - &(ch->comm_payload_size), - 0); - if(!ch->comm_payload_size){ - error = -ENOMEM; - log_printf(LOG_ERR, "Document dump to memory failed.\n"); - goto fail; - } - - buffer = malloc(ch->comm_payload_size + sizeof(comm_header_t)); - if(!buffer){ - error = -ENOMEM; - goto fail; - } - - if(quorate){ - ch->comm_flags |= COMM_BROADCAST_FROM_QUORATE; - } - - swab_header(ch); - memcpy(buffer, ch, sizeof(comm_header_t)); - swab_header(ch); /* Swab back to dip into ch for payload_size */ - memcpy(buffer+sizeof(comm_header_t), payload, ch->comm_payload_size); - - log_printf(LOG_DEBUG, "Sending configuration (version %d)...\n", get_doc_version(master_doc->od_doc)); - sendlen = ch->comm_payload_size + sizeof(comm_header_t); - if(sendto(sfd, buffer, sendlen, 0, - (struct sockaddr *)&addr, (socklen_t)len) < 0){ - log_printf(LOG_ERR, "Sendto failed"); - error = -errno; - } - - fail: - if(buffer) free(buffer); - if(payload) free(payload); - if(ch) free(ch); - if(discard){ - if(master_doc && master_doc->od_doc) - xmlFreeDoc(master_doc->od_doc); - if(master_doc) free(master_doc); - master_doc = NULL; - } - CCSEXIT("process_broadcast"); - return error; -} diff --git a/ccs/daemon/cnx_mgr.h b/ccs/daemon/cnx_mgr.h deleted file mode 100644 index 220628a..0000000 --- a/ccs/daemon/cnx_mgr.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef __CNX_MGR_DOT_H__ -#define __CNX_MGR_DOT_H__ - -int process_request(int afd); -int process_broadcast(int sfd); - -#endif /* __CNX_MGR_DOT_H__ */ diff --git a/ccs/daemon/comm_headers.h b/ccs/daemon/comm_headers.h deleted file mode 100644 index 4187fba..0000000 --- a/ccs/daemon/comm_headers.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef __COMM_HEADERS_DOT_H__ -#define __COMM_HEADERS_DOT_H__ - -#include -#include - -/* Types of requests */ -#define COMM_CONNECT 1 -#define COMM_DISCONNECT 2 -#define COMM_GET 3 -#define COMM_GET_LIST 4 -#define COMM_SET 5 -#define COMM_GET_STATE 6 -#define COMM_SET_STATE 7 -#define COMM_BROADCAST 8 -#define COMM_UPDATE 9 - -/* Request flags */ -#define COMM_CONNECT_FORCE 1 -#define COMM_CONNECT_BLOCKING 2 -#define COMM_SET_STATE_RESET_QUERY 4 -#define COMM_BROADCAST_FROM_QUORATE 8 -#define COMM_UPDATE_NOTICE 16 -#define COMM_UPDATE_NOTICE_ACK 32 -#define COMM_UPDATE_COMMIT 64 -#define COMM_UPDATE_COMMIT_ACK 128 - -typedef struct comm_header_s { - int comm_type; - int comm_flags; /* flags that tune a particular type of operation */ - int comm_desc; - int comm_error; - int comm_payload_size; -} comm_header_t; - -#define COMM_LOCAL_SOCKET "/var/run/cluster/ccsd.sock" - -static inline void swab_header(comm_header_t *head) { -#if __BYTE_ORDER == __BIG_ENDIAN - head->comm_type = bswap_32(head->comm_type); - head->comm_flags = bswap_32(head->comm_flags); - head->comm_desc = bswap_32(head->comm_desc); - head->comm_error = bswap_32(head->comm_error); - head->comm_payload_size = bswap_32(head->comm_payload_size); -#endif -} - -#endif /* __COMM_HEADERS_DOT_H__ */ diff --git a/ccs/daemon/debug.h b/ccs/daemon/debug.h deleted file mode 100644 index 4ed365c..0000000 --- a/ccs/daemon/debug.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef __DEBUG_DOT_H__ -#define __DEBUG_DOT_H__ - -#define CCSENTER(x) log_printf(LOG_DEBUG, "Entering " x "\n") -#define CCSEXIT(x) log_printf(LOG_DEBUG, "Exiting " x "\n") - -extern int debug; - -#endif /* __DEBUG_DOT_H__ */ diff --git a/ccs/daemon/globals.c b/ccs/daemon/globals.c deleted file mode 100644 index 6f2e582..0000000 --- a/ccs/daemon/globals.c +++ /dev/null @@ -1,19 +0,0 @@ -#include - -int ppid = 0; - -char *config_file_location = NULL; -char *lockfile_location = NULL; - -int frontend_port = 50006; -int backend_port = 50007; -int cluster_base_port = 50008; - -/* -1 = no preference, 0 = IPv4, 1 = IPv6 */ -int IPv6=-1; - -/* 1 = allow and use UNIX domain sockets for local ccs queries */ -int use_local = 1; - -char *multicast_address = NULL; -int ttl=1; diff --git a/ccs/daemon/globals.h b/ccs/daemon/globals.h deleted file mode 100644 index 91fbe53..0000000 --- a/ccs/daemon/globals.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef __GLOBALS_H__ -#define __GLOBALS_H__ - -#define DEFAULT_CCSD_LOCKFILE "/var/run/cluster/ccsd.pid" - -#define EXIT_MAGMA_PLUGINS 2 /* Magma plugins are not available */ -#define EXIT_CLUSTER_FAIL 3 /* General failure to connect to cluster */ -#define EXIT_LOCKFILE 4 /* Failed to create lock file */ - -extern int ppid; - -extern char *config_file_location; -extern char *lockfile_location; - -extern int frontend_port; -extern int backend_port; -extern int cluster_base_port; - -extern int IPv6; -extern int use_local; -extern char *multicast_address; -extern int ttl; -#endif /* __GLOBALS_H__ */ diff --git a/ccs/daemon/misc.c b/ccs/daemon/misc.c deleted file mode 100644 index 543dd31..0000000 --- a/ccs/daemon/misc.c +++ /dev/null @@ -1,288 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "comm_headers.h" -#include "debug.h" -#include "misc.h" - -volatile int quorate = 0; - -int update_required = 0; -pthread_mutex_t update_lock; - -open_doc_t *master_doc = NULL; - -/** - * do_simple_xml_query - * @ctx: xml context - * @query: "/cluster/@name" - * - * it only handles this kind of query - */ -static char *do_simple_xml_query(xmlXPathContextPtr ctx, char *query) { - xmlXPathObjectPtr obj = NULL; - xmlNodePtr node = NULL; - - CCSENTER("do_simple_xml_query"); - - obj = xmlXPathEvalExpression((xmlChar *)query, ctx); - if(!obj || !obj->nodesetval || (obj->nodesetval->nodeNr != 1)) - log_printf(LOG_DEBUG, "Error processing query: %s.\n", query); - else { - node = obj->nodesetval->nodeTab[0]; - if(node->type != XML_ATTRIBUTE_NODE) - log_printf(LOG_DEBUG, "Object returned is not of attribute type.\n"); - else { - if(!node->children->content || !strlen((char *)node->children->content)) - log_printf(LOG_DEBUG, "No content found.\n"); - else { - CCSEXIT("do_simple_xml_query"); - return strdup((char *)node->children->content); - } - } - } - - if(obj) - xmlXPathFreeObject(obj); - - CCSEXIT("do_simple_xml_query"); - return NULL; -} - -int get_doc_version(xmlDocPtr ldoc){ - int i; - int error = 0; - xmlXPathContextPtr ctx = NULL; - char *res = NULL; - - CCSENTER("get_doc_version"); - - ctx = xmlXPathNewContext(ldoc); - if(!ctx){ - log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); - error = -EIO; /* ATTENTION -- what should this be? */ - goto fail; - } - - res = do_simple_xml_query(ctx, "/cluster/@config_version"); - if(res) { - for(i=0; i < strlen(res); i++){ - if(!isdigit(res[i])){ - log_printf(LOG_ERR, "config_version is not a valid integer.\n"); - error = -EINVAL; - goto fail; - } - } - error = atoi(res); - } else - error = -EINVAL; - -fail: - - if(res) - free(res); - - if(ctx){ - xmlXPathFreeContext(ctx); - } - - CCSEXIT("get_doc_version"); - return error; -} - - -/** - * get_cluster_name - * @ldoc: - * - * The caller must remember to free the string that is returned. - * - * Returns: NULL on failure, (char *) otherwise - */ -char *get_cluster_name(xmlDocPtr ldoc){ - int error = 0; - char *rtn = NULL; - xmlXPathContextPtr ctx = NULL; - - CCSENTER("get_cluster_name"); - - ctx = xmlXPathNewContext(ldoc); - if(!ctx){ - log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); - error = -EIO; /* ATTENTION -- what should this be? */ - goto fail; - } - - rtn = do_simple_xml_query(ctx, "/cluster/@name"); - -fail: - - if(ctx){ - xmlXPathFreeContext(ctx); - } - CCSEXIT("get_cluster_name"); - return rtn; -} - -/** - * set_ccs_logging - * @ldoc: - * - * Returns: -1 on failure. NULL on success. - */ -int set_ccs_logging(xmlDocPtr ldoc){ - int facility = SYSLOGFACILITY, loglevel = SYSLOGLEVEL, global_debug = 0; - char *res = NULL, *error = NULL; - xmlXPathContextPtr ctx = NULL; - unsigned int logmode; - - CCSENTER("set_ccs_logging"); - - ctx = xmlXPathNewContext(ldoc); - if(!ctx){ - log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); - return -1; - } - - logmode = logsys_config_mode_get(); - - if(!debug) { - res = do_simple_xml_query(ctx, "/cluster/logging/@debug"); - if(res) { - if(!strcmp(res, "on")) { - global_debug = 1; - } else - if(!strcmp(res, "off")) { - global_debug = 0; - } else - log_printf(LOG_ERR, "global debug: unknown value\n"); - free(res); - res=NULL; - } - - res = do_simple_xml_query(ctx, "/cluster/logging/logger_subsys[@subsys=\"CCS\"]/@debug"); - if(res) { - if(!strcmp(res, "on")) { - debug = 1; - } else - if(!strcmp(res, "off")) { /* debug from cmdline/envvars override config */ - debug = 0; - } else - log_printf(LOG_ERR, "subsys debug: unknown value\n"); - free(res); - res=NULL; - } else - debug = global_debug; /* global debug overrides subsystem only if latter is not specified */ - - res = do_simple_xml_query(ctx, "/cluster/logging/logger_subsys[@subsys=\"CCS\"]/@syslog_level"); - if(res) { - loglevel = logsys_priority_id_get (res); - if (loglevel < 0) - loglevel = SYSLOGLEVEL; - - if(!debug) { - if(loglevel == LOG_LEVEL_DEBUG) - debug = 1; - - logsys_config_priority_set (loglevel); - } - - free(res); - res=NULL; - } - } else - logsys_config_priority_set (LOG_LEVEL_DEBUG); - - res = do_simple_xml_query(ctx, "/cluster/logging/@to_stderr"); - if(res) { - if(!strcmp(res, "yes")) { - logmode |= LOG_MODE_OUTPUT_STDERR; - } else - if(!strcmp(res, "no")) { - logmode &= ~LOG_MODE_OUTPUT_STDERR; - } else - log_printf(LOG_ERR, "to_stderr: unknown value\n"); - free(res); - res=NULL; - } - - res = do_simple_xml_query(ctx, "/cluster/logging/@to_syslog"); - if(res) { - if(!strcmp(res, "yes")) { - logmode |= LOG_MODE_OUTPUT_SYSLOG_THREADED; - } else - if(!strcmp(res, "no")) { - logmode &= ~LOG_MODE_OUTPUT_SYSLOG_THREADED; - } else - log_printf(LOG_ERR, "to_syslog: unknown value\n"); - free(res); - res=NULL; - } - - res = do_simple_xml_query(ctx, "/cluster/logging/@to_file"); - if(res) { - if(!strcmp(res, "yes")) { - logmode |= LOG_MODE_OUTPUT_FILE; - } else - if(!strcmp(res, "no")) { - logmode &= ~LOG_MODE_OUTPUT_FILE; - } else - log_printf(LOG_ERR, "to_file: unknown value\n"); - free(res); - res=NULL; - } - - res = do_simple_xml_query(ctx, "/cluster/logging/@logfile"); - if(res) { - if(logsys_config_file_set(&error, res)) - log_printf(LOG_ERR, "logfile: unable to open %s for logging\n", res); - free(res); - res=NULL; - } else - log_printf(LOG_DEBUG, "logfile: use default built-in log file: %s\n", LOGDIR "/ccs.log"); - - res = do_simple_xml_query(ctx, "/cluster/logging/@syslog_facility"); - if(res) { - facility = logsys_facility_id_get (res); - if (facility < 0) { - log_printf(LOG_ERR, "syslog_facility: unknown value\n"); - facility = SYSLOGFACILITY; - } - - logsys_config_facility_set ("CCS", facility); - log_printf(LOG_DEBUG, "log_facility: %s (%d).\n", res, facility); - free(res); - res=NULL; - } - - if(ctx){ - xmlXPathFreeContext(ctx); - } - - if(logmode & LOG_MODE_BUFFER_BEFORE_CONFIG) { - log_printf(LOG_DEBUG, "logsys config enabled from set_ccs_logging\n"); - logmode &= ~LOG_MODE_BUFFER_BEFORE_CONFIG; - logmode |= LOG_MODE_FLUSH_AFTER_CONFIG; - logsys_config_mode_set (logmode); - } - - CCSEXIT("set_ccs_logging"); - return 0; -} diff --git a/ccs/daemon/misc.h b/ccs/daemon/misc.h deleted file mode 100644 index 0636c7e..0000000 --- a/ccs/daemon/misc.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef __MISC_H__ -#define __MISC_H__ - -typedef struct open_doc { - int od_refs; - xmlDocPtr od_doc; -} open_doc_t; - - -extern volatile int quorate; -extern int update_required; -extern pthread_mutex_t update_lock; -extern open_doc_t *master_doc; - -char *get_cluster_name(xmlDocPtr ldoc); -int get_doc_version(xmlDocPtr ldoc); -int set_ccs_logging(xmlDocPtr ldoc); - -#endif /* __MISC_H__ */ diff --git a/ccs/man/Makefile b/ccs/man/Makefile deleted file mode 100644 index abedd52..0000000 --- a/ccs/man/Makefile +++ /dev/null @@ -1,6 +0,0 @@ -TARGET= cluster.conf.5 \ - ccs.7 \ - ccsd.8 - -include ../../make/defines.mk -include $(OBJDIR)/make/man.mk diff --git a/ccs/man/ccsd.8 b/ccs/man/ccsd.8 deleted file mode 100644 index bcf0957..0000000 --- a/ccs/man/ccsd.8 +++ /dev/null @@ -1,74 +0,0 @@ -.TH ccsd 8 - -.SH NAME -ccsd - manages the /etc/cluster/cluster.conf file - -.SH SYNOPSIS -.B ccsd -[\fIOPTION\fR].. - -.SH DESCRIPTION - -\fBccsd\fP is part of the Cluster Configuration System (CCS) and manages -the cluster.conf file in a cman cluster. It handles requests for -cluster.conf information made through libccs. It also keeps the -cluster.conf file in sync among cluster nodes based on the value of -cluster.conf:cluster/config_version. ccsd may replace the local -cluster.conf file if it discovers a newer version on another node. - -.SH OPTIONS -.TP -\fB-X\fP -Disable all cluster manager (cman) and inter-node interactions. Simply -respond to local libccs requests based on the current cluster.conf file. -.TP -\fB-4\fP -Use IPv4 for inter-node communication. By default, IPv6 is tried, then IPv4. -.TP -\fB-6\fP -Use IPv6 for inter-node communication. By default, IPv6 is tried, then IPv4. -.TP -\fB-I\fP -Force use of IP for local communication (disables use of UNIX domain sockets). -If set, \fBccsd\fP will use the specified inter-node communication protocol -(see the \fB-4\fP and \fB-6\fP options). If one is not specified, -IPv6 is tried, then IPv4. For backward compatibility, IP connections are -still allowed even when UNIX domain sockets are available. -.TP -\fB-h\fP -Help. Print out the usage syntax. -.TP -\fB-m \fP -Used to specify the multicast address. The keyword "default" can be used, -in which case "ff02::3:1" is used for IPv6 and "224.0.2.5" is used for IPv4. - -If you are using IPv4, the default action is to use broadcast. Specifying -this option will cause multicast to be used in that instance. -.TP -\fB-n\fP -No daemon. Run in the foreground. -.TP -\fB-P :\fP -You have the option of specifying the port numbers used by ccsd. The port -identifier is either: b, c, or f. "b" is the port which ccsd attempts to -communicate with ccsd processes on other machines, via broadcast/multicast, to -obtain or validate its config file (cluster.conf). This is known as the backend -port. "c" is the base port number of two consecutive ports used by ccsd -processes to communicate cluster membership information. This is known as the -cluster base port. "f" is the port number that listens for information requests -from the CCS library (or programs using it). This is known as the frontend port. - -So, to change the frontend port one might specify \fI-P f:60000\fP. -.TP -\fB-t \fP -Set the multicast threshold (aka time to live). -.TP -\fB-V\fP -Print the version information. -.TP -\fB-d\fP -Enable debugging output. - -.SH SEE ALSO -ccs(7), cman(5), ccs_tool(8), ccs_test(8), cluster.conf(5) - diff --git a/config/Makefile b/config/Makefile index 50468ed..abae65c 100644 --- a/config/Makefile +++ b/config/Makefile @@ -1,4 +1,4 @@ include ../make/defines.mk include $(OBJDIR)/make/passthrough.mk -SUBDIRS=libs plugins tools man +SUBDIRS=libs plugins tools daemons man diff --git a/config/daemons/Makefile b/config/daemons/Makefile new file mode 100644 index 0000000..58121e8 --- /dev/null +++ b/config/daemons/Makefile @@ -0,0 +1,8 @@ +include ../../make/defines.mk +include $(OBJDIR)/make/passthrough.mk + +SUBDIRS = man + +ifdef legacy_code +SUBDIRS += ccsd +endif diff --git a/config/daemons/ccsd/Makefile b/config/daemons/ccsd/Makefile new file mode 100644 index 0000000..4e3efd3 --- /dev/null +++ b/config/daemons/ccsd/Makefile @@ -0,0 +1,37 @@ +TARGET= ccsd + +SBINDIRT=$(TARGET) + +all: depends ${TARGET} + +include ../../../make/defines.mk +include $(OBJDIR)/make/cobj.mk +include $(OBJDIR)/make/clean.mk +include $(OBJDIR)/make/install.mk +include $(OBJDIR)/make/uninstall.mk + +OBJS= ccsd.o \ + cnx_mgr.o \ + cluster_mgr.o \ + misc.o \ + globals.o + +CFLAGS += -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE +CFLAGS += -I${cmanincdir} `xml2-config --cflags` -I${corosyncincdir} +CFLAGS += -I$(S) +CFLAGS += -I${incdir} + +LDFLAGS += -L${cmanlibdir} -lcman +LDFLAGS += -L${corosynclibdir} -llogsys +LDFLAGS += `xml2-config --libs` -lpthread +LDFLAGS += -L${libdir} + +${TARGET}: ${OBJS} + $(CC) -o $@ $^ $(LDFLAGS) + +depends: + $(MAKE) -C $(OBJDIR)/cman/lib all + +clean: generalclean + +-include $(OBJS:.o=.d) diff --git a/config/daemons/ccsd/ccsd.c b/config/daemons/ccsd/ccsd.c new file mode 100644 index 0000000..d3b63d8 --- /dev/null +++ b/config/daemons/ccsd/ccsd.c @@ -0,0 +1,922 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "debug.h" +#include "cnx_mgr.h" +#include "cluster_mgr.h" +#include "globals.h" +#include "comm_headers.h" +#include "misc.h" + +#include "copyright.cf" + +int debug = 0; +extern volatile int quorate; +int no_manager_opt=0; +static int exit_now=0; +static unsigned int flags=0; +static sigset_t signal_mask; +static int signal_received = 0; +#define FLAG_NODAEMON 1 + +static char *parse_cli_args(int argc, char *argv[]); +static int check_cluster_conf(void); +static void daemonize(void); +static void print_start_msg(char *msg); +static int join_group(int sfd, int loopback, int port); +static int setup_local_socket(int backlog); +static inline void process_signals(void); + +int main(int argc, char *argv[]){ + int i,error=0; + int trueint = 1; + int sfds[3] = {-1,-1,-1}, afd; + struct sockaddr_storage addr; + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; + int addr_size=0; + fd_set rset, tmp_set; + char *msg; + unsigned int logmode; + + logsys_init("CCS", LOG_MODE_OUTPUT_STDERR | LOG_MODE_OUTPUT_SYSLOG_THREADED | LOG_MODE_OUTPUT_FILE | LOG_MODE_FILTER_DEBUG_FROM_SYSLOG | LOG_MODE_BUFFER_BEFORE_CONFIG, SYSLOGFACILITY, SYSLOGLEVEL, LOGDIR "/ccs.log"); + + msg = parse_cli_args(argc, argv); + + if(getenv("CCS_DEBUGLOG")) + debug = 1; + + /* enable debug as early as possible */ + if(debug) + logsys_config_priority_set (LOG_LEVEL_DEBUG); + + if(check_cluster_conf()){ + /* check_cluster_conf will print out errors if there are any */ + exit(EXIT_FAILURE); + } + + logmode = logsys_config_mode_get(); + + if(logmode & LOG_MODE_BUFFER_BEFORE_CONFIG) { + log_printf(LOG_DEBUG, "Using default CCS logsys config options\n"); + logmode &= ~LOG_MODE_BUFFER_BEFORE_CONFIG; + logmode |= LOG_MODE_FLUSH_AFTER_CONFIG; + logsys_config_mode_set (logmode); + } + + daemonize(); + + print_start_msg(msg); + + if(msg){ + free(msg); + } + + if (!no_manager_opt){ + if(start_cluster_monitor_thread()){ + log_printf(LOG_ERR, "Unable to create thread.\n"); + exit(EXIT_FAILURE); + } + } + + memset(&addr, 0, sizeof(struct sockaddr_storage)); + + /** Setup the socket to communicate with the CCS library **/ + if(IPv6 && (sfds[0] = socket(PF_INET6, SOCK_STREAM, 0)) < 0){ + if(IPv6 == -1){ + log_printf(LOG_DEBUG, "Unable to create IPv6 socket:: %s\n", strerror(errno)); + IPv6=0; + } else { + log_printf(LOG_ERR, "Unable to create IPv6 socket"); + exit(EXIT_FAILURE); + } + } else { + /* IPv6 is no longer optional for ccsd + IPv6 = (IPv6)? 1: 0; + */ + } + + log_printf(LOG_DEBUG, "Using %s\n", IPv6?"IPv6":"IPv4"); + + if(!IPv6 && (sfds[0] = socket(PF_INET, SOCK_STREAM, 0)) < 0){ + log_printf(LOG_ERR, "Unable to create IPv4 socket"); + exit(EXIT_FAILURE); + } + + if(setsockopt(sfds[0], SOL_SOCKET, SO_REUSEADDR, &trueint, sizeof(int))){ + log_printf(LOG_ERR, "Unable to set socket option"); + exit(EXIT_FAILURE); + } + + if(IPv6){ + addr_size = sizeof(struct sockaddr_in6); + addr6->sin6_family = AF_INET6; + addr6->sin6_addr = in6addr_loopback; + addr6->sin6_port = htons(frontend_port); + } else { + addr_size = sizeof(struct sockaddr_in); + addr4->sin_family = AF_INET; + /* addr4->sin_addr.s_addr = INADDR_LOOPBACK; */ + inet_aton("127.0.0.1", (struct in_addr *)&(addr4->sin_addr.s_addr)); + addr4->sin_port = htons(frontend_port); + } + + if(bind(sfds[0], (struct sockaddr *)&addr, addr_size) < 0){ + log_printf(LOG_ERR, "Unable to bind socket"); + close(sfds[0]); + exit(EXIT_FAILURE); + } + + listen(sfds[0], 5); + + + /** Setup the socket to communicate with the CCS library **/ + sfds[1] = socket((IPv6)? PF_INET6: PF_INET, SOCK_DGRAM, 0); + if(sfds[1] < 0){ + log_printf(LOG_ERR, "Socket creation failed"); + exit(EXIT_FAILURE); + } else { + int trueint = 1; + if(setsockopt(sfds[1], SOL_SOCKET, SO_REUSEADDR, &trueint, sizeof(int))){ + log_printf(LOG_ERR, "Unable to set socket option"); + exit(EXIT_FAILURE); + } + } + + if(IPv6){ + addr6->sin6_family = AF_INET6; + addr6->sin6_addr = in6addr_any; + addr6->sin6_port = htons(backend_port); + } else { + addr4->sin_family = AF_INET; + addr4->sin_addr.s_addr = INADDR_ANY; + addr4->sin_port = htons(backend_port); + } + + if(bind(sfds[1], (struct sockaddr *)&addr, addr_size) < 0){ + log_printf(LOG_ERR, "Unable to bind socket"); + close(sfds[1]); + return -errno; + } + + if(IPv6 || multicast_address){ + if(join_group(sfds[1], 1, backend_port)){ + log_printf(LOG_ERR, "Unable to join multicast group.\n"); + exit(EXIT_FAILURE); + } + } + + /* Set up the unix (local) socket for CCS lib comms */ + sfds[2] = setup_local_socket(SOMAXCONN); + + FD_ZERO(&rset); + FD_SET(sfds[0], &rset); + FD_SET(sfds[1], &rset); + if (sfds[2] >= 0) + FD_SET(sfds[2], &rset); + + log_printf(LOG_DEBUG, "Sending SIGTERM to parent\n"); + kill(getppid(), SIGTERM); + + while(1){ + unsigned int len = addr_size; + + process_signals(); + + tmp_set = rset; + + if((select(FD_SETSIZE, &tmp_set, NULL,NULL,NULL) < 0)){ + if(errno != EINTR){ + log_printf(LOG_ERR, "Select failed"); + } + continue; + } + + for(i=0; i<3; i++){ + if(sfds[i] < 0 || !FD_ISSET(sfds[i], &tmp_set)){ + continue; + } + if(i == 0){ + uint16_t port; + log_printf(LOG_DEBUG, "NORMAL CCS REQUEST.\n"); + afd = accept(sfds[i], (struct sockaddr *)&addr, &len); + if(afd < 0){ + log_printf(LOG_ERR, "Unable to accept connection"); + continue; + } + + port = (IPv6) ? addr6->sin6_port : addr4->sin_port; + + log_printf(LOG_DEBUG, "Connection requested from port %u.\n", ntohs(port)); + + if(ntohs(port) > 1024){ + log_printf(LOG_ERR, "Refusing connection from port > 1024: port = %d", ntohs(port)); + close(afd); + continue; + } + if((error = process_request(afd))){ + log_printf(LOG_ERR, "Error while processing request: %s\n", strerror(-error)); + } + close(afd); + } else if (i == 2) { + log_printf(LOG_DEBUG, "NORMAL CCS REQUEST.\n"); + afd = accept(sfds[i], NULL, NULL); + if(afd < 0){ + log_printf(LOG_ERR, "Unable to accept connection"); + continue; + } + + log_printf(LOG_DEBUG, "Connection requested from local socket\n"); + + if((error = process_request(afd))){ + log_printf(LOG_ERR, "Error while processing request: %s\n", strerror(-error)); + } + close(afd); + } else { + log_printf(LOG_DEBUG, "BROADCAST REQUEST.\n"); + if((error = process_broadcast(sfds[i]))){ + log_printf(LOG_ERR, "Error while processing broadcast: %s\n", strerror(-error)); + } + } + } + } + logsys_exit(); + exit(EXIT_SUCCESS); +} + + +/** + * print_usage - print usage information + * @stream: open file stream to print to + * + */ +static void print_usage(FILE *stream){ + CCSENTER("print_usage"); + fprintf(stream, + "Usage:\n" + "\n" + "ccsd [Options]\n" + "\n" + "Options:\n" + " -4 Use IPv4 only.\n" + " -6 Use IPv6 only.\n" + " -I Use IP for everything (disables local sockets)\n" + " -h Help.\n" + " -m Specify multicast address (\"default\" ok).\n" + " -n No Daemon. Run in the foreground.\n" + " -d Enable debugging output.\n" + " -t Multicast threshold (aka Time to Live) value.\n" + " -P [bcf]:# Specify various port numbers.\n" + " -V Print version information.\n" + " -X No cluster manager, just read local " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n" + ); + CCSEXIT("print_usage"); +} + + +static int is_multicast_addr(char *addr_string){ + int rtn = 0; + struct sockaddr_storage addr; + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; + + CCSENTER("is_multicast_addr"); + + if(inet_pton(AF_INET6, addr_string, &(addr6->sin6_addr)) > 0){ + if(IN6_IS_ADDR_MULTICAST(&addr6->sin6_addr)){ + rtn = AF_INET6; + } + } else if(inet_pton(AF_INET, addr_string, &(addr4->sin_addr)) > 0){ + if(IN_MULTICAST(ntohl(addr4->sin_addr.s_addr))){ + rtn = AF_INET; + } + } + CCSEXIT("is_multicast_addr"); + return rtn; +} + + +/** + * parse_cli_args + * @argc: + * @argv: + * + * This function parses the command line arguments and sets the + * appropriate flags in the global 'flags' variable. Additionally, + * it sets the global 'config_file_location'. This function + * will either succeed or cause the program to exit. + * + * Returns: string (or NULL) describing changes, exit(EXIT_FAILURE) on error + */ +static char *parse_cli_args(int argc, char *argv[]){ + int c, error=0; + int buff_size=512; + char buff[buff_size]; + int buff_index=0; + + CCSENTER("parse_cli_args"); + + config_file_location = strdup(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); + lockfile_location = strdup(DEFAULT_CCSD_LOCKFILE); + + if(!config_file_location || !lockfile_location){ + fprintf(stderr, "Insufficient memory.\n"); + error = -ENOMEM; + goto fail; + } + + memset(buff, 0, buff_size); + + while((c = getopt(argc, argv, "46Icdf:hlm:nP:t:sVX")) != -1){ + switch(c){ + case '4': + if(IPv6 == 1){ + fprintf(stderr, + "Setting protocol to IPv4 conflicts with multicast address.\n"); + error = -EINVAL; + goto fail; + } + IPv6=0; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " IP Protocol:: IPv4 only\n"); + break; + case '6': + if(IPv6 == 0){ + fprintf(stderr, + "Setting protocol to IPv6 conflicts with previous protocol choice.\n"); + error = -EINVAL; + goto fail; + } + IPv6=1; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " IP Protocol:: IPv6 only\n"); + break; + case 'I': + if (use_local) { + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Communication:: Local sockets disabled\n"); + } + use_local = 0; + break; + case 'c': + fprintf(stderr, "The '-c' option is deprecated.\n" + "Try '-h' for help.\n"); + error = -EINVAL; + goto fail; + case 'd': + debug = 1; + break; + case 'f': /* might be usable for upgrade */ + free(config_file_location); + config_file_location = optarg; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Config file location:: %s\n", optarg); + break; + case 'h': + print_usage(stdout); + exit(EXIT_SUCCESS); + case 'l': + fprintf(stderr, "The '-l' option is deprecated.\n" + "Try '-h' for help.\n"); + error = -EINVAL; + goto fail; + case 'm': + if(strcmp("default", optarg)){ + int type = is_multicast_addr(optarg); + if((IPv6 == 1) && (type != AF_INET6)){ + fprintf(stderr, "%s is not a valid IPv6 multicast address.\n", optarg); + error = -EINVAL; + goto fail; + } else if((IPv6 == 0) && (type != AF_INET)){ + fprintf(stderr, "%s is not a valid IPv4 multicast address.\n", optarg); + error = -EINVAL; + goto fail; + } else if(type == 0){ + fprintf(stderr, "%s is not a valid multicast address.\n", optarg); + error = -EINVAL; + goto fail; + } else { + IPv6 = (type == AF_INET6)? 1: 0; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " IP Protocol:: %s only*\n", + (IPv6)? "IPv6" : "IPv4"); + } + } + multicast_address = optarg; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Multicast (%s):: SET\n", optarg); + break; + case 'n': + flags |= FLAG_NODAEMON; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " No Daemon:: SET\n"); + break; + case 'p': + free(lockfile_location); + lockfile_location = optarg; + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Lock file location:: %s\n", optarg); + break; + case 'P': + if(optarg[1] != ':'){ + fprintf(stderr, "Bad argument to '-P' option.\n" + "Try '-h' for help.\n"); + error = -EINVAL; + goto fail; + } + switch(optarg[0]){ + case 'b': /* backend port number */ + backend_port = atoi(optarg+2); + if(backend_port < 1024){ + fprintf(stderr, "Bad backend port number.\n"); + error = -EINVAL; + goto fail; + } + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Backend Port:: %d\n", backend_port); + break; + case 'c': /* cluster base port number */ + cluster_base_port = atoi(optarg+2); + if(cluster_base_port < 1024){ + fprintf(stderr, "Bad cluster base port number.\n"); + error = -EINVAL; + goto fail; + } + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Cluster base port:: %d\n", cluster_base_port); + break; + case 'f': /* frontend port number */ + frontend_port = atoi(optarg+2); + if(frontend_port < 1024){ + fprintf(stderr, "Bad frontend port number.\n"); + error = -EINVAL; + goto fail; + } + buff_index += snprintf(buff+buff_index, buff_size-buff_index, + " Frontend Port:: %d\n", frontend_port); + break; + default: + fprintf(stderr, "Bad argument to '-P' option.\n" + "Try '-h' for help.\n"); + error = -EINVAL; + goto fail; + } + break; + case 's': + fprintf(stderr, "The '-s' option is deprecated.\n" + "Try '-h' for help.\n"); + error = -EINVAL; + goto fail; + case 't': + ttl = atoi(optarg); + break; + case 'V': + printf("%s %s (built %s %s)\n", argv[0], RELEASE_VERSION, __DATE__, __TIME__); + printf("%s\n", REDHAT_COPYRIGHT); + exit(EXIT_SUCCESS); + case 'X': + no_manager_opt = 1; + quorate = 1; + break; + default: + print_usage(stderr); + error = -EINVAL; + goto fail; + } + } + + fail: + CCSEXIT("parse_cli_args"); + + if(error){ + exit(EXIT_FAILURE); + } + if(strlen(buff)){ + return(strdup(buff)); + } else { + return NULL; + } +} + + +/* + * check_cluster_conf - check validity of local copy of cluster.conf + * + * This function tries to parse the xml doc at 'config_file_location'. + * If it fails, it gives instructions to the user. + * + * Returns: 0 on success, -1 on failure + */ +static int check_cluster_conf(void){ + struct stat stat_buf; + xmlDocPtr doc = NULL; + + CCSENTER("check_cluster_conf"); + + if(!stat(config_file_location, &stat_buf)){ + doc = xmlParseFile(config_file_location); + if(!doc){ + log_printf(LOG_ERR, "\nUnable to parse %s.\n" + "You should either:\n" + " 1. Correct the XML mistakes, or\n" + " 2. (Re)move the file and attempt to grab a " + "valid copy from the network.\n", config_file_location); + return -1; + } + set_ccs_logging(doc); + xmlFreeDoc(doc); + } else { + /* no cluster.conf file. This is fine, just need to get it from the network */ + if(no_manager_opt){ + log_printf(LOG_ERR, "\nNo local config file found: %s\n", config_file_location); + return -1; + } + } + + CCSEXIT("check_cluster_conf"); + return 0; +} + + +/** + * create_lockfile - create and lock a lock file + * @lockfile: location of lock file + * + * Returns: 0 on success, -1 otherwise + */ +static int create_lockfile(char *lockfile){ + int fd, error=0; + struct stat stat_buf; + struct flock lock; + char buffer[50]; + + CCSENTER("create_lockfile"); + + if(!strncmp(lockfile, "/var/run/cluster/", 17)){ + if(stat("/var/run/cluster", &stat_buf)){ + if(mkdir("/var/run/cluster", S_IRWXU)){ + log_printf(LOG_ERR, "Cannot create lockfile directory"); + error = -errno; + goto fail; + } + } else if(!S_ISDIR(stat_buf.st_mode)){ + log_printf(LOG_ERR, "/var/run/cluster is not a directory.\n" + "Cannot create lockfile.\n"); + error = -ENOTDIR; + goto fail; + } + } + + if((fd = open(lockfile, O_CREAT | O_WRONLY, + (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH))) < 0){ + log_printf(LOG_ERR, "Cannot create lockfile"); + error = -errno; + goto fail; + } + + lock.l_type = F_WRLCK; + lock.l_start = 0; + lock.l_whence = SEEK_SET; + lock.l_len = 0; + + if (fcntl(fd, F_SETLK, &lock) < 0) { + close(fd); + log_printf(LOG_ERR, "The ccsd process is already running.\n"); + error = -errno; + goto fail; + } + + if (ftruncate(fd, 0) < 0) { + close(fd); + error = -errno; + goto fail; + } + + sprintf(buffer, "%d\n", getpid()); + + if(write(fd, buffer, strlen(buffer)) < strlen(buffer)){ + close(fd); + unlink(lockfile); + error = -errno; + goto fail; + } + + fail: + CCSEXIT("create_lockfile"); + + /* leave fd open - rely on exit to close it */ + if(error){ + return error; + } else { + return 0; + } +} + + +/** + * parent_exit_handler: exit the parent + * @sig: the signal + * + */ +static void parent_exit_handler(int sig){ + CCSENTER("parent_exit_handler"); + exit_now=1; + CCSEXIT("parent_exit_handler"); +} + + +/** + * sig_handler + * @sig + * + * This handles signals which the daemon might receive. + */ +static void sig_handler(int sig){ + sigaddset(&signal_mask, sig); + ++signal_received; +} + +static void process_signal(int sig){ + int err; + + CCSENTER("sig_handler"); + + switch(sig) { + case SIGINT: + log_printf(LOG_INFO, "Stopping ccsd, SIGINT received.\n"); + err = EXIT_SUCCESS; + break; + case SIGQUIT: + log_printf(LOG_INFO, "Stopping ccsd, SIGQUIT received.\n"); + err = EXIT_SUCCESS; + break; + case SIGTERM: + log_printf(LOG_INFO, "Stopping ccsd, SIGTERM received.\n"); + err = EXIT_SUCCESS; + break; + case SIGHUP: + log_printf(LOG_INFO, "SIGHUP received.\n"); + log_printf(LOG_INFO, "Use ccs_tool for updates.\n"); + return; + break; + default: + log_printf(LOG_ERR, "Stopping ccsd, unknown signal %d received.\n", sig); + err = EXIT_FAILURE; + } + + CCSEXIT("sig_handler"); + exit(err); +} + + +static inline void process_signals(void) +{ + int x; + + if (!signal_received) + return; + + signal_received = 0; + + for (x = 1; x < _NSIG; x++) { + if (sigismember(&signal_mask, x)) { + sigdelset(&signal_mask, x); + process_signal(x); + } + } +} + + +/** + * daemonize + * + * This function will do the following: + * - daemonize, if required + * - set up the lockfile + * - set up logging + * - set up signal handlers + * It will cause the program to exit if there is a failure. + */ +static void daemonize(void){ + int error=0; + int pid; + + CCSENTER("daemonize"); + + if(flags & FLAG_NODAEMON){ + log_printf(LOG_DEBUG, "Entering non-daemon mode.\n"); + if((error = create_lockfile(lockfile_location))){ + goto fail; + } + } else { + log_printf(LOG_DEBUG, "Entering daemon mode.\n"); + + signal(SIGTERM, &parent_exit_handler); + + pid = fork(); + + if(pid < 0){ + log_printf(LOG_ERR, "Unable to fork().\n"); + error = pid; + goto fail; + } + + if(pid){ + int status; + while(!waitpid(pid, &status, WNOHANG) && !exit_now); + if(exit_now) { + exit(EXIT_SUCCESS); + } + + switch(WEXITSTATUS(status)){ + case EXIT_CLUSTER_FAIL: + log_printf(LOG_ERR, "Failed to connect to cluster manager.\n"); + break; + case EXIT_LOCKFILE: + log_printf(LOG_ERR, "Failed to create lockfile.\n"); + log_printf(LOG_ERR, "Hint: ccsd is already running.\n"); + break; + } + exit(EXIT_FAILURE); + } + ppid = getppid(); + setsid(); + if(chdir("/") < 0) + goto fail; + umask(0); + + close(0); close(1); close(2); + open("/dev/null", O_RDONLY); /* reopen stdin */ + open("/dev/null", O_WRONLY); /* reopen stdout */ + open("/dev/null", O_WRONLY); /* reopen stderr */ + + if((error = create_lockfile(lockfile_location))){ + exit(EXIT_LOCKFILE); + } + + /* Make the parent stop waiting */ + //log_printf(LOG_DEBUG, "Die early\n"); + //kill(getppid(), SIGTERM); + } + + signal(SIGINT, &sig_handler); + signal(SIGQUIT, &sig_handler); + signal(SIGTERM, &sig_handler); + signal(SIGHUP, &sig_handler); + signal(SIGPIPE, SIG_IGN); + sigemptyset(&signal_mask); + signal_received = 0; + + fail: + CCSEXIT("daemonize"); + + if(error){ + exit(EXIT_FAILURE); + } +} + + +/** + * print_start_msg + * + */ +static void print_start_msg(char *msg){ + CCSENTER("print_start_msg"); + /* We want the start message to print every time */ + log_printf(LOG_INFO, "Starting ccsd %s:\n", RELEASE_VERSION); + log_printf(LOG_INFO, " Built: "__DATE__" "__TIME__"\n"); + log_printf(LOG_INFO, " %s\n", REDHAT_COPYRIGHT); + if(msg){ + log_printf(LOG_INFO, "%s\n", msg); + } + CCSEXIT("print_start_msg"); +} + + +static int join_group(int sfd, int loopback, int port){ + int error = 0; + char *addr_string; + struct sockaddr_storage addr; + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; + + CCSENTER("join_group"); + + if(IPv6){ + if(!multicast_address || !strcmp("default", multicast_address)){ + addr_string = "ff02::3:1"; + } else { + addr_string = multicast_address; + } + inet_pton(AF_INET6, addr_string, &(addr6->sin6_addr)); + addr6->sin6_family = AF_INET6; + addr6->sin6_port = htons(port); + } else { + if(!strcmp("default", multicast_address)){ + addr_string = "224.0.2.5"; + } else { + addr_string = multicast_address; + } + inet_pton(AF_INET, addr_string, &(addr4->sin_addr)); + addr4->sin_family = AF_INET; + addr4->sin_port = htons(port); + } + + if(addr.ss_family == AF_INET){ + struct ip_mreq mreq; + + mreq.imr_multiaddr.s_addr = addr4->sin_addr.s_addr; + mreq.imr_interface.s_addr = INADDR_ANY; + + if(setsockopt(sfd, IPPROTO_IP, IP_MULTICAST_LOOP, + &loopback, sizeof(loopback)) < 0){ + log_printf(LOG_ERR, "Unable to %s loopback.\n", loopback?"SET":"UNSET"); + error = -errno; + goto fail; + } + if(setsockopt(sfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + (const void *)&mreq, sizeof(mreq)) < 0){ + log_printf(LOG_ERR, "Unable to add to membership.\n"); + error = -errno; + goto fail; + } + } else if(addr.ss_family == AF_INET6){ + struct ipv6_mreq mreq; + + memcpy(&mreq.ipv6mr_multiaddr, &(addr6->sin6_addr), sizeof(struct in6_addr)); + + mreq.ipv6mr_interface = 0; + + if(setsockopt(sfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, + &loopback, sizeof(loopback)) < 0){ + log_printf(LOG_ERR, "Unable to %s loopback.\n", loopback?"SET":"UNSET"); + error = -errno; + goto fail; + } + if(setsockopt(sfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + (const void *)&mreq, sizeof(mreq)) < 0){ + log_printf(LOG_ERR, "Unable to add to membership: %s\n", strerror(errno)); + error = -errno; + goto fail; + } + } else { + log_printf(LOG_ERR, "Unknown address family.\n"); + error = -EINVAL; + } + fail: + CCSEXIT("join_group"); + return 0; +} + +int setup_local_socket(int backlog) +{ + int sock = -1; + struct sockaddr_un su; + mode_t om; + + CCSENTER("setup_local_socket"); + if (use_local == 0) + goto fail; + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) + goto fail; + + /* This is ours ;) */ + unlink(COMM_LOCAL_SOCKET); + om = umask(077); + su.sun_family = PF_LOCAL; + snprintf(su.sun_path, sizeof(su.sun_path), COMM_LOCAL_SOCKET); + + if (bind(sock, &su, sizeof(su)) < 0) { + umask(om); + goto fail; + } + umask(om); + + if (listen(sock, backlog) < 0) + goto fail; + + log_printf(LOG_DEBUG, "Set up local socket on %s\n", su.sun_path); + CCSEXIT("setup_local_socket"); + return sock; +fail: + if (sock >= 0) + close(sock); + CCSEXIT("setup_local_socket"); + return -1; +} diff --git a/config/daemons/ccsd/cluster_mgr.c b/config/daemons/ccsd/cluster_mgr.c new file mode 100644 index 0000000..236933f --- /dev/null +++ b/config/daemons/ccsd/cluster_mgr.c @@ -0,0 +1,688 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "comm_headers.h" +#include "debug.h" +#include "misc.h" +#include "globals.h" +#include "libcman.h" + +typedef struct member_list { + int count; + int pad; + cman_node_t *nodes; +} member_list_t; + +static member_list_t *members = NULL; + +static member_list_t *get_member_list(cman_handle_t handle); +static void free_member_list(member_list_t *list); +static char *member_id_to_name(member_list_t *list, int node); +static int member_addr_to_id(member_list_t *list, struct sockaddr *addr); + +static int select_retry(int max_fd, fd_set *rfds, fd_set *wfds, fd_set *xfds, + struct timeval *timeout); + +static ssize_t read_retry(int fd, void *buf, int count, struct timeval *timeout); + +static int check_update_doc(xmlDocPtr tmp_doc) +{ + int error = 0; + + char *str1 = NULL; + char *str2 = NULL; + + CCSENTER("check_update_doc"); + + if (!(str1 = get_cluster_name(tmp_doc))) { + log_printf(LOG_ERR, "Unable to get cluster name from new config file.\n"); + error = -EINVAL; + goto fail; + } + + if (master_doc && master_doc->od_doc && + !(str2 = get_cluster_name(master_doc->od_doc))) { + log_printf(LOG_DEBUG, "Unable to get cluster name from current master doc.\n"); + } + + if (str2 && strcmp(str1, str2)) { + log_printf(LOG_ERR, "Cluster names for current and update configs do not match.\n"); + log_printf(LOG_ERR, " Current cluster name:: <%s>\n", str2); + log_printf(LOG_ERR, " Proposed update name:: <%s>\n", str1); + error = -EINVAL; + goto fail; + } + + if (master_doc && master_doc->od_doc && + (get_doc_version(tmp_doc) <= get_doc_version(master_doc->od_doc))) { + log_printf(LOG_ERR, "Proposed updated config file does not have greater version number.\n"); + log_printf(LOG_ERR, " Current config_version :: %d\n", get_doc_version(master_doc->od_doc)); + log_printf(LOG_ERR, " Proposed config_version:: %d\n", get_doc_version(tmp_doc)); + error = -EINVAL; + } + +fail: + + if (str1) { + free(str1); + } + + if (str2) { + free(str2); + } + + CCSEXIT("check_update_doc"); + return error; +} + +static int handle_cluster_message(int fd) +{ + int error = 0; + int unlock = 0; + int socket = -1; + + FILE *fp = NULL; + char *buffer = NULL; + xmlDocPtr tmp_doc = NULL; + comm_header_t ch; + uint64_t nodeid; + mode_t old_mode; + socklen_t client_len; + + struct timeval tv; + struct sockaddr client_addr; + static uint64_t master_node = 0; + + CCSENTER("handle_cluster_message"); + + log_printf(LOG_DEBUG, "Cluster message on socket: %d\n", fd); + + client_len = sizeof(client_addr); + + if ((socket = accept(fd, &client_addr, &client_len)) < 0) { + log_printf(LOG_ERR, "Failed to accept connection.\n"); + goto fail; + } + + if ((nodeid = member_addr_to_id(members, &client_addr)) < 0) { + log_printf(LOG_ERR, "Unable to determine node ID.\n"); + goto fail; + } + + log_printf(LOG_DEBUG, "Accept socket: %d\n", socket); + + error = recv(socket, &ch, sizeof(comm_header_t), MSG_PEEK); + + if (error < 0) { + log_printf(LOG_ERR, "Failed to receive message from %s\n", + member_id_to_name(members, nodeid)); + goto fail; + } + + log_printf(LOG_DEBUG, "Message (%d bytes) received from %s\n", + error, member_id_to_name(members, nodeid)); + + swab_header(&ch); + + if (ch.comm_type != COMM_UPDATE) { + log_printf(LOG_ERR, "Unexpected communication type (%d)... ignoring.\n", + ch.comm_type); + error = -EINVAL; + goto fail; + } + + if (ch.comm_flags == COMM_UPDATE_NOTICE) { + buffer = malloc(ch.comm_payload_size + sizeof(comm_header_t)); + if (!buffer) { + log_printf(LOG_ERR, "Unable to allocate space to perform update.\n"); + error = -ENOMEM; + goto fail; + } + + log_printf(LOG_DEBUG, "Updated config size:: %d\n", ch.comm_payload_size); + + tv.tv_sec = 5; + tv.tv_usec = 0; + + error = read_retry(socket, buffer, ch.comm_payload_size + sizeof(comm_header_t), &tv); + + if (error < 0) { + log_printf(LOG_ERR, "Unable to retrieve updated config"); + goto fail; + } + + pthread_mutex_lock(&update_lock); + unlock = 1; + + log_printf(LOG_DEBUG, "Got lock 0\n"); + + tmp_doc = xmlParseMemory(buffer+sizeof(comm_header_t), ch.comm_payload_size); + + if (!tmp_doc) { + log_printf(LOG_ERR, "Unable to parse updated config file.\n"); + /* ATTENTION -- need better error code */ + error = -EIO; + goto fail; + } + + if ((error = check_update_doc(tmp_doc)) < 0) { + goto fail; + } + + old_mode = umask(026); + + fp = fopen(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update", "w"); + + umask(old_mode); + + if (!fp) { + log_printf(LOG_ERR, "Unable to open " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update"); + error = -errno; + goto fail; + } + + if (xmlDocDump(fp, tmp_doc) < 0) { + log_printf(LOG_ERR, "Unable to write " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update"); + goto fail; + } + + log_printf(LOG_DEBUG, "Upload of new config file from %s complete.\n", + member_id_to_name(members, nodeid)); + + ch.comm_payload_size = 0; + ch.comm_flags = COMM_UPDATE_NOTICE_ACK; + + log_printf(LOG_DEBUG, "Sending COMM_UPDATE_NOTICE_ACK.\n"); + + swab_header(&ch); + + if ((error = write(socket, &ch, sizeof(comm_header_t))) < 0) { + log_printf(LOG_ERR, "Unable to send COMM_UPDATE_NOTICE_ACK.\n"); + goto fail; + } + + master_node = nodeid; + error = 0; + } + + else if(ch.comm_flags == COMM_UPDATE_COMMIT) { + + tv.tv_sec = 5; + tv.tv_usec = 0; + + error = read_retry(socket, &ch, sizeof(comm_header_t), &tv); + + if (master_node != nodeid) { + log_printf(LOG_ERR, "COMM_UPDATE_COMMIT received from node other than initiator.\n"); + log_printf(LOG_ERR, "Hint: There may be multiple updates happening at once.\n"); + error = -EPERM; + goto fail; + } + + pthread_mutex_lock(&update_lock); + + unlock = 1; + + log_printf(LOG_DEBUG, "Got lock 1\n"); + + tmp_doc = xmlParseFile(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update"); + + if (!tmp_doc) { + log_printf(LOG_ERR, "Unable to parse updated config file.\n"); + /* ATTENTION -- need better error code */ + error = -EIO; + goto fail; + } + + if ((error = check_update_doc(tmp_doc)) < 0) { + goto fail; + } + + old_mode = umask(026); + + fp = fopen(DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE, "w"); + + umask(old_mode); + + if (!fp) { + log_printf(LOG_ERR, "Unable to open " DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE); + error = -errno; + goto fail; + } + + if (xmlDocDump(fp, tmp_doc) < 0) { + log_printf(LOG_ERR, "Unable to write " DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE); + goto fail; + } + + rename(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "-update", DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); + + update_required = 1; + ch.comm_flags = COMM_UPDATE_COMMIT_ACK; + + log_printf(LOG_DEBUG, "Sending COMM_UPDATE_COMMIT_ACK.\n"); + + swab_header(&ch); + + if ((error = write(socket, &ch, sizeof(comm_header_t))) < 0) { + log_printf(LOG_ERR, "Unable to send COMM_UPDATE_NOTICE_ACK.\n"); + goto fail; + } + + error = 0; + } + +fail: + + if (fp) { + fclose(fp); + } + + if (socket >= 0) { + close(socket); + } + + if (buffer) { + free(buffer); + } + + if (tmp_doc) { + xmlFreeDoc(tmp_doc); + } + + if (unlock) { + pthread_mutex_unlock(&update_lock); + } + + CCSEXIT("handle_cluster_message"); + return error; +} + + +static void cman_callback(cman_handle_t handle, void *private, int reason, int arg) +{ + switch (reason) { + case CMAN_REASON_TRY_SHUTDOWN: + cman_replyto_shutdown(handle, 1); + break; + + case CMAN_REASON_STATECHANGE: + quorate = cman_is_quorate(handle); + free_member_list(members); + members = get_member_list(handle); + break; + + default: + break; + } +} + + +static int handle_cluster_event(cman_handle_t handle) +{ + CCSENTER("handle_cluster_event"); + + int rv = 1; + while (rv > 0) { + rv = cman_dispatch(handle, CMAN_DISPATCH_ALL); + } + if (rv < 0) { + return -1; + } + + CCSEXIT("handle_cluster_event"); + return 0; +} + + +static void cluster_communicator(void) +{ + int ccsd_fd = -1; + int cman_fd = -1; + int warn_user = 0; + int opt = 1; + int max_fd; + int n; + int flags; + + fd_set rset; + cman_handle_t handle = NULL; + + struct sockaddr_storage addr; + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; + int addr_size=0; + + CCSENTER("cluster_communicator"); + + memset(&addr, 0, sizeof(struct sockaddr_storage)); + + if (IPv6) { + if ((ccsd_fd = socket(PF_INET6, SOCK_STREAM, 0)) < 0) { + if(IPv6 == -1) { + log_printf(LOG_DEBUG, "Unable to create IPv6 socket:: %s\n", strerror(errno)); + IPv6=0; + } + } + } + + if (!IPv6) { + if ((ccsd_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + log_printf(LOG_ERR, "Unable to create IPv4 socket.\n"); + exit(EXIT_FAILURE); + } + } + + if (setsockopt(ccsd_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) { + log_printf(LOG_ERR, "Unable to set socket option"); + exit(EXIT_FAILURE); + } + + if(IPv6){ + addr_size = sizeof(struct sockaddr_in6); + addr6->sin6_family = AF_INET6; + addr6->sin6_addr = in6addr_any; + addr6->sin6_port = htons(cluster_base_port); + } else { + addr_size = sizeof(struct sockaddr_in); + addr4->sin_family = AF_INET; + addr4->sin_addr.s_addr = INADDR_ANY; + addr4->sin_port = htons(cluster_base_port); + } + + flags = fcntl(ccsd_fd, F_GETFD, 0); + flags |= FD_CLOEXEC; + fcntl(ccsd_fd, F_SETFD, flags); + + if (bind(ccsd_fd, (struct sockaddr *)&addr, addr_size) < 0) { + log_printf(LOG_ERR, "Unable to bind to socket.\n"); + close(ccsd_fd); + exit(EXIT_FAILURE); + } + + if (listen(ccsd_fd, 15) < 0) { + log_printf(LOG_ERR, "Unable to listen to socket.\n"); + close(ccsd_fd); + exit(EXIT_FAILURE); + } + +restart: + + while (handle == NULL) + { + handle = cman_init(NULL); + + if (handle == NULL) { + + warn_user++; + + if (!(warn_user % 30)) + { + log_printf(LOG_ERR, "Unable to connect to cluster infrastructure after %d seconds.\n", + warn_user); + } + + sleep(1); + } + } + + if (ppid) { + kill(ppid, SIGTERM); + ppid = 0; + } + + cman_start_notification(handle, cman_callback); + + quorate = cman_is_quorate(handle); + + log_printf(LOG_INFO, "Initial status:: %s\n", (quorate)? "Quorate" : "Inquorate"); + + members = get_member_list(handle); + + while (1) + { + FD_ZERO(&rset); + cman_fd = cman_get_fd(handle); + + FD_SET(ccsd_fd, &rset); + FD_SET(cman_fd, &rset); + + max_fd = (ccsd_fd > cman_fd) ? ccsd_fd : cman_fd; + + log_printf(LOG_DEBUG, "Waiting for cluster event.\n"); + + if ((n = select((max_fd + 1), &rset, NULL, NULL, NULL)) < 0) { + log_printf(LOG_ERR, "Select failed"); + continue; + } + + log_printf(LOG_DEBUG, "There are %d cluster messages waiting.\n", n); + + while (n) + { + log_printf(LOG_DEBUG, "There are %d messages remaining.\n", n); + + n--; + + if (FD_ISSET(ccsd_fd, &rset)) { + handle_cluster_message(ccsd_fd); + } + + if (FD_ISSET(cman_fd, &rset)) { + if (handle_cluster_event(handle)) { + cman_finish(handle); + handle = NULL; + goto restart; + } + } + } + } + + CCSEXIT("cluster_communicator"); +} + + +int start_cluster_monitor_thread(void) { + int error = 0; + pthread_t thread; + + CCSENTER("start_cluster_monitor_thread"); + + pthread_mutex_init(&update_lock, NULL); + + error = pthread_create(&thread, NULL, (void *)cluster_communicator, NULL); + + if (error) { + log_printf(LOG_ERR, "Failed to create thread: %s\n", strerror(-error)); + goto fail; + } + + pthread_detach(thread); + +fail: + + CCSEXIT("start_cluster_monitor_thread"); + return error; +} + + +static member_list_t *get_member_list(cman_handle_t handle) +{ + int count = 0; + + member_list_t *list = NULL; + cman_node_t *nodes = NULL; + + do + { + + if (nodes != NULL) { + free(nodes); + } + + count = cman_get_node_count(handle); + + if (count <= 0) { + return NULL; + } + + if (list == NULL) { + list = malloc(sizeof(*list)); + } + + if (list == NULL) { + return NULL; + } + + nodes = malloc(sizeof(*nodes) * count); + + if (nodes == NULL) { + free(list); + return NULL; + } + + memset(list, 0, sizeof(*list)); + memset(nodes, 0, sizeof(*nodes) * count); + + cman_get_nodes(handle, count, &list->count, nodes); + + } while (list->count != count); + + list->count = count; + list->nodes = nodes; + + return list; +} + + +static void free_member_list(member_list_t *list) +{ + if (list != NULL) { + if (list->nodes != NULL) { + free(list->nodes); + } + free(list); + } +} + + +static char *member_id_to_name(member_list_t *list, int node) +{ + int i; + + for (i = 0; i < list->count; i++) { + if (list->nodes[i].cn_nodeid == node) { + return list->nodes[i].cn_name; + } + } + + return NULL; +} + + +static int member_addr_to_id(member_list_t *list, struct sockaddr *addr) +{ + int i; + + for (i = 0; i < list->count; i++) { + if (memcmp(&list->nodes[i].cn_address.cna_address, addr, + sizeof(struct sockaddr))) { + + return list->nodes[i].cn_nodeid; + } + } + + return -1; +} + + +static int select_retry(int max_fd, fd_set *rfds, fd_set *wfds, fd_set *xfds, + struct timeval *timeout) +{ + int rv; + + while (1) { + rv = select(max_fd, rfds, wfds, xfds, timeout); + if ((rv == -1) && (errno == EINTR)) { + /* return on EBADF/EINVAL/ENOMEM; continue on EINTR */ + continue; + } + return rv; + } +} + + +static ssize_t read_retry(int fd, void *buf, int count, struct timeval *timeout) +{ + int n, total = 0, remain = count, rv = 0; + fd_set rfds, xfds; + + while (total < count) + { + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + FD_ZERO(&xfds); + FD_SET(fd, &xfds); + + /* + * Select on the socket, in case it closes while we're not + * looking... + */ + rv = select_retry(fd + 1, &rfds, NULL, &xfds, timeout); + if (rv == -1) { + return -1; + } + else if (rv == 0) { + errno = ETIMEDOUT; + return -1; + } + + if (FD_ISSET(fd, &xfds)) { + errno = EPIPE; + return -1; + } + + /* + * Attempt to read off the socket + */ + n = read(fd, buf + (off_t) total, remain); + + /* + * When we know our socket was select()ed and we receive 0 bytes + * when we read, the socket was closed. + */ + if ((n == 0) && (rv == 1)) { + errno = EPIPE; + return -1; + } + + if (n == -1) { + if ((errno == EAGAIN) || (errno == EINTR)) { + /* + * Not ready? Wait for data to become available + */ + continue; + } + + /* Other errors: EPIPE, EINVAL, etc */ + return -1; + } + + total += n; + remain -= n; + } + + return total; +} diff --git a/config/daemons/ccsd/cluster_mgr.h b/config/daemons/ccsd/cluster_mgr.h new file mode 100644 index 0000000..b97b552 --- /dev/null +++ b/config/daemons/ccsd/cluster_mgr.h @@ -0,0 +1,6 @@ +#ifndef __CLUSTER_MGR_DOT_H__ +#define __CLUSTER_MGR_DOT_H__ + +int start_cluster_monitor_thread(void); + +#endif /* __CLUSTER_MGR_DOT_H__ */ diff --git a/config/daemons/ccsd/cnx_mgr.c b/config/daemons/ccsd/cnx_mgr.c new file mode 100644 index 0000000..0dfa1cd --- /dev/null +++ b/config/daemons/ccsd/cnx_mgr.c @@ -0,0 +1,1393 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "comm_headers.h" +#include "debug.h" +#include "misc.h" +#include "globals.h" + +/* Default descriptor expiration time, in seconds */ +#ifndef DEFAULT_EXPIRE +#define DEFAULT_EXPIRE 30 +#endif + +/* Maximum open connection count */ +#ifndef MAX_OPEN_CONNECTIONS +#define MAX_OPEN_CONNECTIONS 30 +#endif + +/* Conversion from descriptor to ocs index */ +#ifdef dindex +#undef dindex +#endif +#define dindex(x) ((x) % MAX_OPEN_CONNECTIONS) + +static inline void _cleanup_descriptor(int desc); + +extern int no_manager_opt; + +typedef struct open_connection_s { + char *oc_cwp; + char *oc_query; + open_doc_t *oc_odoc; + xmlXPathContextPtr oc_ctx; + int oc_index; + int oc_desc; + time_t oc_expire; +} open_connection_t; + +/* ATTENTION: need to lock on this if we start forking the daemon ** +** Also would need to create a shared memory area for open cnx's */ +static open_connection_t **ocs = NULL; +static int _descbase = 0; + +static int _update_config(char *location){ + int error = 0; + int v1=0, v2=0; + open_doc_t *tmp_odoc = NULL; + xmlDocPtr tmp_doc = NULL; + + CCSENTER("_update_config"); + + tmp_doc = xmlParseFile(location); + if(!tmp_doc){ + log_printf(LOG_ERR, "Unable to parse %s\n", location); + error = -EINVAL; + goto fail; + } else if((v2 = get_doc_version(tmp_doc)) < 0){ + log_printf(LOG_ERR, "Unable to get config_version from %s.\n", location); + error = v2; + goto fail; + } else if(master_doc && master_doc->od_doc){ + v1 = get_doc_version(master_doc->od_doc); + if(v1 >= v2){ + log_printf(LOG_ERR, "%s on-disk version is <= to in-memory version.\n", location); + log_printf(LOG_ERR, " On-disk version : %d\n", v2); + log_printf(LOG_ERR, " In-memory version : %d\n", v1); + error = -EPERM; + goto fail; + } + } else { + v1 = 0; + } + + if(!(tmp_odoc = malloc(sizeof(open_doc_t)))){ + error = -ENOMEM; + goto fail; + } + memset(tmp_odoc, 0, sizeof(open_doc_t)); + + tmp_odoc->od_doc = tmp_doc; + + log_printf(LOG_DEBUG, "There are %d references open on version %d of the config file.\n", + (master_doc)?master_doc->od_refs:0, v1); + if(master_doc && !master_doc->od_refs){ + log_printf(LOG_DEBUG, "Freeing version %d\n", v1); + xmlFreeDoc(master_doc->od_doc); + free(master_doc); + master_doc = tmp_odoc; + } else { + master_doc = tmp_odoc; + } + + log_printf(LOG_INFO, "Update of "DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE " complete (version %d -> %d).\n", v1, v2); + fail: + if(tmp_odoc != master_doc){ + free(tmp_odoc); + } + if(tmp_doc != master_doc->od_doc){ + xmlFreeDoc(tmp_doc); + } + + + CCSEXIT("_update_config"); + return error; +} + + +static int update_config(void){ + int error = 0; + CCSENTER("update_config"); + + /* If update_required is set, it means that there is still a pending ** + ** update. We need to pull this one in before doing anything else. */ + if(update_required){ + error = _update_config(DEFAULT_CONFIG_DIR "/." DEFAULT_CONFIG_FILE); + update_required = 0; + if(error){ + log_printf(LOG_ERR, "Previous update could not be completed.\n"); + goto fail; + } + } + + fail: + CCSEXIT("update_config"); + return error; +} + +/** + * broadcast_for_doc + * + * Returns: 0 on success, < 0 on error + */ +static int broadcast_for_doc(char *cluster_name, int blocking){ + int opt; + int error = 0; + int retry = 5; + int sfd = -1; + int trueint; + int v1, v2; + int write_to_disk = 0; + char *tmp_name = NULL; + struct sockaddr_storage addr, recv_addr; + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr; + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr; + unsigned int len = sizeof(struct sockaddr_storage); + int addr_size = 0; + comm_header_t *ch = NULL; + char *bdoc = NULL; + fd_set rset; + struct timeval tv; + xmlDocPtr tmp_doc = NULL; + + CCSENTER("broadcast_for_doc"); + + try_again: + if(!master_doc){ + log_printf(LOG_ERR, "No master_doc!!!\n"); + exit(EXIT_FAILURE); + } + + if(quorate && !cluster_name){ + log_printf(LOG_ERR, "Node is part of quorate cluster, but the cluster name is unknown.\n"); + log_printf(LOG_ERR, " Unable to validate remote config files. Refusing connection.\n"); + error = -ECONNREFUSED; + goto fail; + } + + ch = malloc(sizeof(comm_header_t)); + if(!ch){ + error = -ENOMEM; + goto fail; + } + memset(ch, 0, sizeof(comm_header_t)); + + if(IPv6 && (sfd = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP)) <0){ + log_printf(LOG_ERR, "Unable to create IPv6 socket"); + error = -errno; + goto fail; + } + + if(!IPv6 && ((sfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)){ + log_printf(LOG_ERR, "Unable to create socket for broadcast"); + error = -errno; + goto fail; + } + + memset(&addr, 0, sizeof(struct sockaddr_storage)); + + trueint = 1; + if(IPv6){ + struct ipv6_mreq mreq; + + addr6->sin6_family = AF_INET6; + addr6->sin6_port = htons(backend_port); + + if(!multicast_address || !strcmp(multicast_address, "default")){ + log_printf(LOG_DEBUG, "Trying IPv6 multicast (default).\n"); + if(inet_pton(AF_INET6, "ff02::3:1", &(addr6->sin6_addr)) <= 0){ + log_printf(LOG_ERR, "Unable to convert multicast address"); + error = -errno; + goto fail; + } + } else { + log_printf(LOG_DEBUG, "Trying IPv6 multicast (%s).\n", multicast_address); + if(inet_pton(AF_INET6, multicast_address, &(addr6->sin6_addr)) <= 0){ + log_printf(LOG_ERR, "Unable to convert multicast address"); + error = -errno; + goto fail; + } + } + + memcpy(&mreq.ipv6mr_multiaddr, &(addr6->sin6_addr), sizeof(struct in6_addr)); + mreq.ipv6mr_interface = 0; + opt = 0; + + if(setsockopt(sfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, + &opt, sizeof(opt)) < 0){ + log_printf(LOG_ERR, "Unable to %s loopback.\n", opt?"SET":"UNSET"); + error = -errno; + goto fail; + } + } else { + addr4->sin_family = AF_INET; + addr4->sin_port = htons(backend_port); + if(!multicast_address){ + log_printf(LOG_DEBUG, "Trying IPv4 broadcast.\n"); + + addr4->sin_addr.s_addr = INADDR_BROADCAST; + if((error = setsockopt(sfd, SOL_SOCKET, SO_BROADCAST, &trueint, sizeof(int)))){ + log_printf(LOG_ERR, "Unable to set socket options"); + error = -errno; + goto fail; + } else { + log_printf(LOG_DEBUG, " Broadcast enabled.\n"); + } + } else { + if(!strcmp(multicast_address, "default")){ + log_printf(LOG_DEBUG, "Trying IPv4 multicast (default).\n"); + if(inet_pton(AF_INET, "224.0.2.5", &(addr4->sin_addr)) <= 0){ + log_printf(LOG_ERR, "Unable to convert multicast address"); + error = -errno; + goto fail; + } + } else { + log_printf(LOG_DEBUG, "Trying IPv4 multicast (%s).\n", multicast_address); + if(inet_pton(AF_INET, multicast_address, &(addr4->sin_addr)) <= 0){ + log_printf(LOG_ERR, "Unable to convert multicast address"); + error = -errno; + goto fail; + } + } + opt = 0; + setsockopt(sfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt)); + if(setsockopt(sfd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0){ + log_printf(LOG_ERR, "Unable to set multicast threshold.\n"); + } + } + } + addr_size = IPv6? sizeof(struct sockaddr_in6):sizeof(struct sockaddr_in); + + FD_ZERO(&rset); + + do { + ch->comm_type = COMM_BROADCAST; + + log_printf(LOG_DEBUG, "Sending broadcast.\n"); + swab_header(ch); + + if(sendto(sfd, (char *)ch, sizeof(comm_header_t), 0, + (struct sockaddr *)&addr, addr_size) < 0){ + log_printf(LOG_ERR, "Unable to perform sendto"); + if(retry > 0){ + retry--; + close(sfd); + free(ch); + sleep(2); + goto try_again; + } else { + error = -errno; + goto fail; + } + } + + srandom(getpid()); + FD_SET(sfd, &rset); + tv.tv_sec = 0; + + tv.tv_usec = 250000 + (random()%500000); +#if defined(__sparc__) + log_printf(LOG_DEBUG, "Select waiting %d usec\n", tv.tv_usec); +#else + log_printf(LOG_DEBUG, "Select waiting %ld usec\n", tv.tv_usec); +#endif + while((error = select(sfd+1, &rset, NULL,NULL, &tv))){ + log_printf(LOG_DEBUG, "Select returns %d\n", error); + if(error < 0){ + log_printf(LOG_ERR, "Select failed"); + error = -errno; + goto fail; + } + if(error){ + log_printf(LOG_DEBUG, "Checking broadcast response.\n"); + error = 0; + recvfrom(sfd, (char *)ch, sizeof(comm_header_t), MSG_PEEK, + (struct sockaddr *)&recv_addr, (socklen_t *)&len); + swab_header(ch); + if(!ch->comm_payload_size || ch->comm_error){ + /* clean out this reply by not using MSG_PEEK */ + recvfrom(sfd, (char *)ch, sizeof(comm_header_t), 0, + (struct sockaddr *)&recv_addr, (socklen_t *)&len); + error = -ENODATA; + FD_SET(sfd, &rset); + goto reset_timer; + } + bdoc = malloc(ch->comm_payload_size + sizeof(comm_header_t)); + if(!bdoc){ + error = -ENOMEM; + goto fail; + } + memset(bdoc, 0, ch->comm_payload_size + sizeof(comm_header_t)); + /* ATTENTION -- potential for incomplete package */ + recvfrom(sfd, bdoc, ch->comm_payload_size + sizeof(comm_header_t), + 0, (struct sockaddr *)&recv_addr, &len); + tmp_doc = xmlParseMemory(bdoc+sizeof(comm_header_t), + ch->comm_payload_size); + if(!tmp_doc){ + log_printf(LOG_ERR, "Unable to parse remote configuration.\n"); + free(bdoc); bdoc = NULL; + goto reset_timer; + } + + tmp_name = get_cluster_name(tmp_doc); + log_printf(LOG_DEBUG, " Given cluster name = %s\n", cluster_name); + log_printf(LOG_DEBUG, " Remote cluster name= %s\n", tmp_name); + if(!tmp_name){ + log_printf(LOG_ERR, "Unable to find cluster name in remote configuration.\n"); + free(bdoc); bdoc = NULL; + xmlFreeDoc(tmp_doc); tmp_doc = NULL; + goto reset_timer; + } else if(cluster_name && strcmp(cluster_name, tmp_name)){ + log_printf(LOG_DEBUG, "Remote and local configuration have different cluster names.\n"); + log_printf(LOG_DEBUG, "Skipping...\n"); + free(tmp_name); tmp_name = NULL; + free(bdoc); bdoc = NULL; + xmlFreeDoc(tmp_doc); tmp_doc = NULL; + goto reset_timer; + } + free(tmp_name); tmp_name = NULL; + if(!master_doc->od_doc){ + if((v2 = get_doc_version(tmp_doc)) >= 0){ + log_printf(LOG_INFO, "Remote configuration copy (version = %d) found.\n", v2); + master_doc->od_doc = tmp_doc; + tmp_doc = NULL; + write_to_disk = 1; + } + } else { + if(((v1 = get_doc_version(master_doc->od_doc)) >= 0) && + ((v2 = get_doc_version(tmp_doc)) >= 0)){ + if(ch->comm_flags & COMM_BROADCAST_FROM_QUORATE){ + log_printf(LOG_INFO, "Remote configuration copy is from quorate node.\n"); + log_printf(LOG_INFO, " Local version # : %d\n", v1); + log_printf(LOG_INFO, " Remote version #: %d\n", v2); + if(v1 != v2){ + log_printf(LOG_INFO, "Switching to remote copy.\n"); + } + if(master_doc->od_refs){ + open_doc_t *tmp_odoc; + if(!(tmp_odoc = malloc(sizeof(open_doc_t)))){ + error = -ENOMEM; + goto fail; + } + memset(tmp_odoc, 0, sizeof(open_doc_t)); + tmp_odoc->od_doc = tmp_doc; + master_doc = tmp_odoc; + } else { + xmlFreeDoc(master_doc->od_doc); + master_doc->od_doc = tmp_doc; + } + tmp_doc = NULL; + write_to_disk = 1; + goto out; + } else if(v2 > v1){ + log_printf(LOG_INFO, "Remote configuration copy is newer than local copy.\n"); + log_printf(LOG_INFO, " Local version # : %d\n", v1); + log_printf(LOG_INFO, " Remote version #: %d\n", v2); + if(master_doc->od_refs){ + open_doc_t *tmp_odoc; + if(!(tmp_odoc = malloc(sizeof(open_doc_t)))){ + error = -ENOMEM; + goto fail; + } + memset(tmp_odoc, 0, sizeof(open_doc_t)); + tmp_odoc->od_doc = tmp_doc; + master_doc = tmp_odoc; + } else { + xmlFreeDoc(master_doc->od_doc); + master_doc->od_doc = tmp_doc; + } + tmp_doc = NULL; + write_to_disk = 1; + } + } else { + xmlFreeDoc(tmp_doc); + tmp_doc = NULL; + } + } + free(bdoc); bdoc = NULL; + } + FD_SET(sfd, &rset); + /* select will alter the timeout */ + reset_timer: + tv.tv_sec = 0; + tv.tv_usec = 250000 + (random()%500000); +#if defined(__sparc__) + log_printf(LOG_DEBUG, "Select waiting %d usec\n", tv.tv_usec); +#else + log_printf(LOG_DEBUG, "Select waiting %ld usec\n", tv.tv_usec); +#endif + } + } while(blocking && !master_doc); + out: + if(error){ + goto fail; + } + + if(write_to_disk){ + struct stat stat_buf; + mode_t old_mode; + FILE *f; + /* We did not have a copy available or we found a newer one, so write it out */ + + /* ATTENTION -- its bad if we fail here, because we have an in-memory version ** + ** but it has not been written to disk....................................... */ + if(stat(DEFAULT_CONFIG_DIR, &stat_buf)){ + if(mkdir(DEFAULT_CONFIG_DIR, S_IRWXU | S_IRWXG)){ + log_printf(LOG_ERR, "Unable to create directory " DEFAULT_CONFIG_DIR); + error = -errno; + goto fail; + } + } else if(!S_ISDIR(stat_buf.st_mode)){ + log_printf(LOG_ERR, DEFAULT_CONFIG_DIR " is not a directory.\n"); + error = -ENOTDIR; + goto fail; + } + + old_mode = umask(026); + f = fopen(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE, "w"); + umask(old_mode); + if(!f){ + log_printf(LOG_ERR, "Unable to open " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); + error = -errno; + goto fail; + } + if(xmlDocDump(f, master_doc->od_doc) < 0){ + error = -EIO; + fclose(f); + goto fail; + } + fclose(f); + } + + fail: + if(ch) free(ch); + if(bdoc) free(bdoc); + if(tmp_doc) xmlFreeDoc(tmp_doc); + if(sfd >= 0) close(sfd); + CCSEXIT("broadcast_for_doc"); + return error; +} + +/** + * process_connect: process a connect request + * @afd: accepted socket connection + * @cluster_name: optional cluster name + * + * Returns: 0 on success, < 0 on error + */ +static int process_connect(comm_header_t *ch, char *cluster_name){ + int i=0, error = 0; + int bcast_needed = 0; + char *tmp_name = NULL; + time_t now; + + CCSENTER("process_connect"); + + ch->comm_payload_size = 0; + + log_printf(LOG_DEBUG, "Given cluster name is = %s\n", cluster_name); + + if(!ocs){ + /* this will never be freed - unless exit */ + ocs = malloc(sizeof(open_connection_t *)*MAX_OPEN_CONNECTIONS); + if(!ocs){ + error = -ENOMEM; + goto fail; + } + memset(ocs, 0, sizeof(open_connection_t *)*MAX_OPEN_CONNECTIONS); + } + + if(!quorate && !(ch->comm_flags & COMM_CONNECT_FORCE)){ + log_printf(LOG_INFO, "Cluster is not quorate. Refusing connection.\n"); + error = -ECONNREFUSED; + goto fail; + } + + if(!master_doc){ + /* ATTENTION -- signal could come at any time. It may be better to ** + ** malloc to different var, then copy to master_doc when done */ + master_doc = malloc(sizeof(open_doc_t)); + if(!master_doc){ + error = -ENOMEM; + goto fail; + } + memset(master_doc, 0, sizeof(open_doc_t)); + } + + if(!master_doc->od_doc){ + master_doc->od_doc = xmlParseFile(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); + if(!master_doc->od_doc){ + log_printf(LOG_INFO, "Unable to parse " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE "\n"); + log_printf(LOG_INFO, "Searching cluster for valid copy.\n"); + } else if((error = get_doc_version(master_doc->od_doc)) < 0){ + log_printf(LOG_ERR, "Unable to get config_version from " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); + log_printf(LOG_ERR, "Discarding data and searching for valid copy.\n"); + xmlFreeDoc(master_doc->od_doc); + master_doc->od_doc = NULL; + } else if(!(tmp_name = get_cluster_name(master_doc->od_doc))){ + log_printf(LOG_ERR, "Unable to get cluster name from " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); + log_printf(LOG_ERR, "Discarding data and searching for valid copy.\n"); + xmlFreeDoc(master_doc->od_doc); + master_doc->od_doc = NULL; + } else if(cluster_name && strcmp(cluster_name, tmp_name)){ + log_printf(LOG_ERR, "Given cluster name does not match local " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); + log_printf(LOG_ERR, "Discarding data and searching for matching copy.\n"); + xmlFreeDoc(master_doc->od_doc); + master_doc->od_doc = NULL; + free(tmp_name); tmp_name = NULL; + } else if(set_ccs_logging(master_doc->od_doc) < 0){ + log_printf(LOG_ERR, "Unable to set logging parameters.\n"); + } else { /* Either the names match, or a name wasn't specified. */ + log_printf(LOG_INFO, DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE " (cluster name = %s, version = %d) found.\n", + tmp_name, error); + /* We must check with the others to make sure this is valid. */ + } + if (!no_manager_opt) + bcast_needed = 1; + error = 0; + } else { + tmp_name = get_cluster_name(master_doc->od_doc); + + /* ATTENTION -- if not quorate, consider swapping out in-memory config ** + ** for the config of the name specified............................... */ + + if(cluster_name && strcmp(cluster_name, tmp_name)){ + log_printf(LOG_ERR, "Request for configuration with cluster name, %s\n", cluster_name); + log_printf(LOG_ERR, " However, a configuration with cluster name, %s, is already loaded.\n", + tmp_name); + error = -EINVAL; + goto fail; + } + if(!quorate){ + bcast_needed = 1; + } + } + + if(cluster_name && !tmp_name){ + tmp_name = strdup(cluster_name); + if(!tmp_name){ + error = -ENOMEM; + goto fail; + } + } + + log_printf(LOG_DEBUG, "Blocking is %s.\n", + (ch->comm_flags & COMM_CONNECT_BLOCKING)? "SET": "UNSET"); + log_printf(LOG_DEBUG, "Flags = 0x%x\n", ch->comm_flags); + + /* Need to broadcast regardless (unless quorate) to check version # */ + if(bcast_needed){ + log_printf(LOG_DEBUG, "Broadcast is neccessary.\n"); + } + if(bcast_needed && + (error = broadcast_for_doc(tmp_name, ch->comm_flags & COMM_CONNECT_BLOCKING)) && + !master_doc->od_doc){ + log_printf(LOG_ERR, "Broadcast for config file failed: %s\n", strerror(-error)); + goto fail; + } + error = 0; + + if(!master_doc || !master_doc->od_doc){ + log_printf(LOG_ERR, "The appropriate config file could not be loaded.\n"); + error = -ENODATA; + goto fail; + } + + if(update_required){ + log_printf(LOG_DEBUG, "Update is required.\n"); + if((error = update_config())){ + log_printf(LOG_ERR, "Failed to update config file, required by cluster.\n"); + /* ATTENTION -- remove all open_doc_t's ? */ + goto fail; + } + } + + /* Locate the connection descriptor */ + now = time(NULL); + for(i=0; i < MAX_OPEN_CONNECTIONS; i++){ + if (!ocs[i]) + continue; + if (now >= ocs[i]->oc_expire) { + log_printf(LOG_DEBUG, "Recycling connection descriptor %d: Expired\n", + ocs[i]->oc_desc ); + _cleanup_descriptor(i); + } + } + + for(i=0; i < MAX_OPEN_CONNECTIONS; i++){ + if(!ocs[i]) + break; + } + + if(i >= MAX_OPEN_CONNECTIONS){ + error = -EAGAIN; + goto fail; + } + + ocs[i] = (open_connection_t *)malloc(sizeof(open_connection_t)); + if(!ocs[i]){ + error = -ENOMEM; + goto fail; + } + + memset(ocs[i], 0, sizeof(open_connection_t)); + + master_doc->od_refs++; + ocs[i]->oc_odoc = master_doc; + ocs[i]->oc_ctx = xmlXPathNewContext(ocs[i]->oc_odoc->od_doc); + ocs[i]->oc_expire = now + DEFAULT_EXPIRE; + + /* using error as a temp var */ + error = i + _descbase++ * MAX_OPEN_CONNECTIONS; + if (error > INT_MAX || error < 0) { + error = i; + _descbase = 0; + } + ocs[i]->oc_desc = error; + + /* reset error */ + error = 0; + + if(!ocs[i]->oc_ctx){ + ocs[i]->oc_odoc->od_refs--; + free(ocs[i]); + log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); + error = -EIO; /* ATTENTION -- what should this be? */ + goto fail; + } + + /* return desc to requestor */ + + fail: + if(master_doc && master_doc->od_doc == NULL){ + free(master_doc); + master_doc = NULL; + } + if(tmp_name){ + free(tmp_name); + } + if(error){ + ch->comm_error = error; + } else { + ch->comm_desc = ocs[i]->oc_desc; + } + CCSEXIT("process_connect"); + return error; +} + + +static inline void +_cleanup_descriptor(int desc) +{ + open_doc_t *tmp_odoc; + + if(ocs[desc]->oc_ctx){ + xmlXPathFreeContext(ocs[desc]->oc_ctx); + } + if(ocs[desc]->oc_cwp){ + free(ocs[desc]->oc_cwp); + } + if(ocs[desc]->oc_query){ + free(ocs[desc]->oc_query); + } + tmp_odoc = ocs[desc]->oc_odoc; + if(tmp_odoc->od_refs < 1){ + log_printf(LOG_ERR, "Number of references on an open doc should never be < 1.\n"); + log_printf(LOG_ERR, "This is a fatal error. Exiting...\n"); + exit(EXIT_FAILURE); + } + if(tmp_odoc != master_doc && tmp_odoc->od_refs == 1){ + log_printf(LOG_DEBUG, "No more references on version %d of config file, freeing...\n", + get_doc_version(tmp_odoc->od_doc)); + xmlFreeDoc(tmp_odoc->od_doc); + free(tmp_odoc); + } else { + tmp_odoc->od_refs--; + } + + free(ocs[desc]); + ocs[desc] = NULL; +} + + +/** + * process_disconnect: close an open session + * @afd: accepted socket connection + * @desc: descriptor describing the open connection + * + * This fuction frees all memory associated with an open session. + * + * Returns: 0 on success, < 0 on error + */ +static int process_disconnect(comm_header_t *ch){ + int desc = dindex(ch->comm_desc); + int error=0; + CCSENTER("process_disconnect"); + + ch->comm_payload_size = 0; + + if(desc < 0){ + log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", desc); + log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); + error = -EBADR; + goto fail; + } + + if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ + /* send failure to requestor ? */ + log_printf(LOG_ERR, "Attempt to close an unopened CCS descriptor (%d).\n", + ch->comm_desc); + + error = -EBADR; + goto fail; + } else { + _cleanup_descriptor(desc); + } + + fail: + if(error){ + ch->comm_error = error; + } else { + ch->comm_desc = -1; + } + CCSEXIT("process_disconnect"); + return error; +} + +/* + * _process_get + * @ch + * @payload + * + * This function runs the xml query. If the query is different from the + * previous query, it will always fill the payload with the first match. + * If the current query and the previous query are the same, it fills the + * payload with next match. If the last of all possible matches was + * returned by the previous query and the current query is the same, + * the payload will be filled with the 1st match and 1 will be returned + * as the result of the function. + * + * Returns: -EXXX on error, 1 if restarting list, 0 otherwise + */ +static int _process_get(comm_header_t *ch, char **payload){ + int error = 0, desc = dindex(ch->comm_desc); + xmlXPathObjectPtr obj = NULL; + char *query = NULL; + + CCSENTER("_process_get"); + if(!ch->comm_payload_size){ + log_printf(LOG_ERR, "process_get: payload size is zero.\n"); + error = -EINVAL; + goto fail; + } + + if(ch->comm_desc < 0){ + log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); + log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); + error = -EBADR; + goto fail; + } + + if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ + log_printf(LOG_ERR, "process_get: Invalid connection descriptor received.\n"); + error = -EBADR; + goto fail; + } + + if(ocs[desc]->oc_query && !strcmp(*payload,ocs[desc]->oc_query)){ + ocs[desc]->oc_index++; + log_printf(LOG_DEBUG, "Index = %d\n",ocs[desc]->oc_index); + log_printf(LOG_DEBUG, " Query = %s\n", *payload); + } else { + log_printf(LOG_DEBUG, "Index reset (new query).\n"); + log_printf(LOG_DEBUG, " Query = %s\n", *payload); + ocs[desc]->oc_index = 0; + if(ocs[desc]->oc_query){ + free(ocs[desc]->oc_query); + } + ocs[desc]->oc_query = (char *)strdup(*payload); + } + + /* ATTENTION -- should path expansion go before index inc ? */ + if(((ch->comm_payload_size > 1) && + ((*payload)[0] == '/')) || + !ocs[desc]->oc_cwp){ + log_printf(LOG_DEBUG, "Query involves absolute path or cwp is not set.\n"); + query = (char *)strdup(*payload); + if(!query){ + error = -ENOMEM; + goto fail; + } + } else { + /* +2 because of NULL and '/' character */ + log_printf(LOG_DEBUG, "Query involves relative path.\n"); + query = malloc(strlen(*payload)+strlen(ocs[desc]->oc_cwp)+2); + if(!query){ + error = -ENOMEM; + goto fail; + } + sprintf(query, "%s/%s", ocs[desc]->oc_cwp, *payload); + } + + /* Bump expiration time */ + ocs[desc]->oc_expire = time(NULL) + DEFAULT_EXPIRE; + + obj = xmlXPathEvalExpression((xmlChar *)query, ocs[desc]->oc_ctx); + if(obj){ + log_printf(LOG_DEBUG, "Obj type = %d (%s)\n", obj->type, (obj->type == 1)?"XPATH_NODESET":""); + log_printf(LOG_DEBUG, "Number of matches: %d\n", (obj->nodesetval)?obj->nodesetval->nodeNr:0); + if(obj->nodesetval && (obj->nodesetval->nodeNr > 0) ){ + xmlNodePtr node; + int size=0; + int nnv=0; /* name 'n' value */ + + if(ocs[desc]->oc_index >= obj->nodesetval->nodeNr){ + ocs[desc]->oc_index = 0; + error = 1; + log_printf(LOG_DEBUG, "Index reset to zero (end of list).\n"); + } + + node = obj->nodesetval->nodeTab[ocs[desc]->oc_index]; + + log_printf(LOG_DEBUG, "Node (%s) type = %d (%s)\n", node->name, node->type, + (node->type == 1)? "XML_ELEMENT_NODE": + (node->type == 2)? "XML_ATTRIBUTE_NODE":""); + + if(!node) { + log_printf(LOG_DEBUG, "No content found.\n"); + error = -ENODATA; + goto fail; + } + + if(((node->type == XML_ATTRIBUTE_NODE) && strstr(query, "@*")) || + ((node->type == XML_ELEMENT_NODE) && strstr(query, "child::*"))){ + /* add on the trailing NULL and the '=' separator for a list of attrs + or an element node + CDATA*/ + if (node->children && node->children->content) + size = strlen((char *)node->children->content) + + strlen((char *)node->name)+2; + else + size = strlen((char *)node->name)+2; + nnv= 1; + } else { + if (node->children && node->children->content) { + size = strlen((char *)node->children->content)+1; + } else { + error = -ENODATA; + goto fail; + } + } + + if(size <= ch->comm_payload_size){ /* do we already have enough space? */ + log_printf(LOG_DEBUG, "No extra space needed.\n"); + if(nnv){ + sprintf(*payload, "%s=%s", node->name, node->children ? + (char *)node->children->content:""); + } else { + sprintf(*payload, "%s", node->children ? node->children->content : + node->name); + } + + } else { + log_printf(LOG_DEBUG, "Extra space needed.\n"); + free(*payload); + *payload = (char *)malloc(size); + if(!*payload){ + error = -ENOMEM; + goto fail; + } + if(nnv){ + sprintf(*payload, "%s=%s", node->name, node->children ? + (char *)node->children->content:""); + } else { + sprintf(*payload, "%s", node->children ? node->children->content : + node->name); + } + } + log_printf(LOG_DEBUG, "Query results:: %s\n", *payload); + ch->comm_payload_size = size; + } else { + log_printf(LOG_DEBUG, "No nodes found.\n"); + ch->comm_payload_size = 0; + error = -ENODATA; + goto fail; + } + } else { + log_printf(LOG_ERR, "Error: unable to evaluate xpath query \"%s\"\n", *payload); + error = -EINVAL; + goto fail; + } + + fail: + if(obj){ + xmlXPathFreeObject(obj); + } + if(error < 0){ + ch->comm_error = error; + ch->comm_payload_size = 0; + } + if(query) { free(query); } + CCSEXIT("_process_get"); + return error; +} + +static int process_get(comm_header_t *ch, char **payload){ + int error; + CCSENTER("process_get"); + + error = _process_get(ch, payload); + + CCSEXIT("process_get"); + return (error < 0)? error: 0; +} + +static int process_get_list(comm_header_t *ch, char **payload){ + int error; + CCSENTER("process_get_list"); + + error = _process_get(ch, payload); + if(error){ + ch->comm_payload_size = 0; + if(ocs && ocs[dindex(ch->comm_desc)]) + ocs[dindex(ch->comm_desc)]->oc_index = -1; + } + + CCSEXIT("process_get_list"); + return (error < 0)? error: 0; +} + +static int process_set(comm_header_t *ch, char *payload){ + int error = 0; + int desc = dindex(ch->comm_desc); + + CCSENTER("process_set"); + if(!ch->comm_payload_size){ + log_printf(LOG_ERR, "process_set: payload size is zero.\n"); + error = -EINVAL; + goto fail; + } + + if(ch->comm_desc < 0){ + log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); + log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); + error = -EBADR; + goto fail; + } + + if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ + log_printf(LOG_ERR, "process_set: Invalid connection descriptor received.\n"); + error = -EBADR; + goto fail; + } + + error = -ENOSYS; + + fail: + free(payload); + ch->comm_payload_size = 0; + if(error){ + ch->comm_error = error; + } + CCSEXIT("process_set"); + return error; +} + + +static int process_get_state(comm_header_t *ch, char **payload){ + int error = 0, desc = dindex(ch->comm_desc); + char *load = NULL; + + CCSENTER("process_get_state"); + if(ch->comm_payload_size){ + log_printf(LOG_ERR, "process_get_state: payload size is nonzero.\n"); + error = -EINVAL; + goto fail; + } + + if(ch->comm_desc < 0){ + log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); + log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); + error = -EBADR; + goto fail; + } + + if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ + log_printf(LOG_ERR, "process_get_state: Invalid connection descriptor received.\n"); + error = -EBADR; + goto fail; + } + + if(ocs[desc]->oc_cwp && ocs[desc]->oc_query){ + int size = strlen(ocs[desc]->oc_cwp) + + strlen(ocs[desc]->oc_query) + 2; + log_printf(LOG_DEBUG, "Both cwp and query are set.\n"); + load = malloc(size); + if(!load){ + error = -ENOMEM; + goto fail; + } + strcpy(load, ocs[desc]->oc_cwp); + strcpy(load+strlen(ocs[desc]->oc_cwp)+1, ocs[desc]->oc_query); + ch->comm_payload_size = size; + } else if(ocs[desc]->oc_cwp){ + log_printf(LOG_DEBUG, "Only cwp is set.\n"); + load = (char *)strdup(ocs[desc]->oc_cwp); + if(!load){ + error = -ENOMEM; + goto fail; + } + ch->comm_payload_size = strlen(load)+1; + } else if(ocs[desc]->oc_query){ + int size = strlen(ocs[desc]->oc_query) + 2; + log_printf(LOG_DEBUG, "Only query is set.\n"); + load = malloc(size); + if(!load){ + error = -ENOMEM; + goto fail; + } + memset(load, 0, size); + strcpy(load+1, ocs[desc]->oc_query); + ch->comm_payload_size = size; + } + + ocs[desc]->oc_expire = time(NULL) + DEFAULT_EXPIRE; + *payload = load; + + fail: + if(error){ + if(load) { free(load); } + ch->comm_error = error; + ch->comm_payload_size = 0; + } + CCSEXIT("process_get_state"); + return error; +} + + +static int process_set_state(comm_header_t *ch, char *payload){ + int error = 0, desc = dindex(ch->comm_desc); + + CCSENTER("process_set_state"); + if(!ch->comm_payload_size){ + log_printf(LOG_ERR, "process_set_state: payload size is zero.\n"); + error = -EINVAL; + goto fail; + } + + if(ch->comm_desc < 0){ + log_printf(LOG_ERR, "Invalid descriptor specified (%d).\n", ch->comm_desc); + log_printf(LOG_ERR, "Someone may be attempting something evil.\n"); + error = -EBADR; + goto fail; + } + + if(!ocs || !ocs[desc] || (ocs[desc]->oc_desc != ch->comm_desc)){ + log_printf(LOG_ERR, "process_set_state: Invalid connection descriptor received.\n"); + error = -EBADR; + goto fail; + } + + if(ocs[desc]->oc_cwp){ + free(ocs[desc]->oc_cwp); + ocs[desc]->oc_cwp = NULL; + } + + if((ch->comm_flags & COMM_SET_STATE_RESET_QUERY) && ocs[desc]->oc_query){ + free(ocs[desc]->oc_query); + ocs[desc]->oc_query = NULL; + } + + ocs[desc]->oc_expire = time(NULL) + DEFAULT_EXPIRE; + ocs[desc]->oc_cwp = (char *)strdup(payload); + + fail: + ch->comm_payload_size = 0; + if(error){ + ch->comm_error = error; + } + + CCSEXIT("process_set_state"); + return error; +} + + +/** + * process_request + * @afd + * + * This function operates as a switch, passing the request to the + * appropriate function. + * + * Returns: 0 on success, < 0 on error + */ +int process_request(int afd){ + int error=0; + comm_header_t *ch = NULL, *tmp_ch; + char *payload = NULL; + + CCSENTER("process_request"); + + if(!(ch = (comm_header_t *)malloc(sizeof(comm_header_t)))){ + error = -ENOMEM; + goto fail; + } + + error = read(afd, ch, sizeof(comm_header_t)); + if(error < 0){ + log_printf(LOG_ERR, "Unable to read comm_header_t"); + goto fail; + } else if(error < sizeof(comm_header_t)){ + log_printf(LOG_ERR, "Unable to read complete comm_header_t.\n"); + error = -EBADE; + goto fail; + } + + if(ch->comm_payload_size){ + if(!(payload = (char *)malloc(ch->comm_payload_size))){ + error = -ENOMEM; + goto fail; + } + error = read(afd, payload, ch->comm_payload_size); + if(error < 0){ + log_printf(LOG_ERR, "Unable to read payload"); + goto fail; + } else if(error < ch->comm_payload_size){ + log_printf(LOG_ERR, "Unable to read complete payload.\n"); + error = -EBADE; + goto fail; + } + } + + switch(ch->comm_type){ + case COMM_CONNECT: + if((error = process_connect(ch, payload)) < 0){ + log_printf(LOG_ERR, "Error while processing connect: %s\n", strerror(-error)); + goto fail; + } + break; + case COMM_DISCONNECT: + if((error = process_disconnect(ch)) < 0){ + log_printf(LOG_ERR, "Error while processing disconnect: %s\n", strerror(-error)); + goto fail; + } + break; + case COMM_GET: + if((error = process_get(ch, &payload)) < 0){ + if(error != -ENODATA){ + log_printf(LOG_ERR, "Error while processing get: %s\n", strerror(-error)); + } + goto fail; + } + break; + case COMM_GET_LIST: + if((error = process_get_list(ch, &payload)) < 0){ + if(error != -ENODATA){ + log_printf(LOG_ERR, "Error while processing get: %s\n", strerror(-error)); + } + goto fail; + } + break; + case COMM_SET: + if((error = process_set(ch, payload)) < 0){ + log_printf(LOG_ERR, "Error while processing set: %s\n", strerror(-error)); + goto fail; + } + break; + case COMM_GET_STATE: + if((error = process_get_state(ch, &payload)) < 0){ + log_printf(LOG_ERR, "Error while processing get_state: %s\n", strerror(-error)); + goto fail; + } + break; + case COMM_SET_STATE: + if((error = process_set_state(ch, payload)) < 0){ + log_printf(LOG_ERR, "Error while processing set_state: %s\n", strerror(-error)); + goto fail; + } + break; + default: + log_printf(LOG_ERR, "Unknown connection request received.\n"); + error = -EINVAL; + ch->comm_error = error; + ch->comm_payload_size = 0; + } + + if(ch->comm_payload_size){ + log_printf(LOG_DEBUG, "Reallocating transfer buffer.\n"); + tmp_ch = (comm_header_t *) + realloc(ch,sizeof(comm_header_t)+ch->comm_payload_size); + + if(tmp_ch) { ch = tmp_ch; } else { + log_printf(LOG_ERR, "Not enough memory to complete request.\n"); + error = -ENOMEM; + goto fail; + } + memcpy((char *)ch+sizeof(comm_header_t), payload, ch->comm_payload_size); + } + + fail: + error = write(afd, ch, sizeof(comm_header_t)+ch->comm_payload_size); + if(error < 0){ + if (errno == EINTR) + goto fail; + if (errno == EPIPE) { + error = 0; + } else { + log_printf(LOG_ERR, "Unable to write package back to sender"); + } + } else if(error < (sizeof(comm_header_t)+ch->comm_payload_size)){ + log_printf(LOG_ERR, "Unable to write complete package.\n"); + error = -EBADE; + goto fail; + } else { + error = 0; + } + + if(ch){ free(ch); } + if(payload){ free(payload); } + + CCSEXIT("process_request"); + return error; +} + + +/** + * process_broadcast + * @sfd: the UDP socket + * + * Returns: 0 on success, < 0 on failure + */ +int process_broadcast(int sfd){ + int error = 0; + comm_header_t *ch = NULL; + xmlChar *payload = NULL; + char *buffer = NULL; + struct sockaddr_storage addr; + unsigned int len = sizeof(struct sockaddr_storage); /* value/result for recvfrom */ + int sendlen; + int discard = 0; + + CCSENTER("process_broadcast"); + + ch = malloc(sizeof(comm_header_t)); + if(!ch){ + error = -ENOMEM; + goto fail; + } + memset(ch, 0, sizeof(comm_header_t)); + memset(&addr, 0, sizeof(struct sockaddr_storage)); /* just to make sure */ + + log_printf(LOG_DEBUG, "Waiting to receive broadcast request.\n"); + if(recvfrom(sfd, ch, sizeof(comm_header_t), 0, (struct sockaddr *)&addr, &len) < 0){ + log_printf(LOG_ERR, "Unable to perform recvfrom"); + error = -errno; + goto fail; + } + swab_header(ch); + + if(ch->comm_type != COMM_BROADCAST){ + /* Either someone is pinging this port, or there is an older version ** + ** of ccs trying to get bcast response. Either way, we should not ** + ** respond to them.................................................. */ + log_printf(LOG_DEBUG, "Received invalid request on broadcast port. %x\n",ch->comm_type); + error = -EINVAL; + goto fail; + } + + /* need to ignore my own broadcasts */ + + if(ch->comm_payload_size){ + /* cluster name was sent, need to read it */ + } + + if(!master_doc){ + discard = 1; + log_printf(LOG_DEBUG, "master_doc not loaded. Attempting to load it.\n"); + if(!(master_doc = malloc(sizeof(open_doc_t)))){ + error = -ENOMEM; + goto fail; + } + memset(master_doc, 0, sizeof(open_doc_t)); + master_doc->od_doc = xmlParseFile(DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE); + if(!master_doc->od_doc){ + free(master_doc); + master_doc = NULL; + log_printf(LOG_ERR, "Unable to parse " DEFAULT_CONFIG_DIR "/" DEFAULT_CONFIG_FILE ".\n"); + error = -ENODATA; + goto fail; + } + log_printf(LOG_DEBUG, "master_doc found and loaded.\n"); + } else if(update_required){ + log_printf(LOG_DEBUG, "Update is required.\n"); + if((error = update_config())){ + log_printf(LOG_ERR, "Failed to update config file, required by cluster.\n"); + /* ATTENTION -- remove all open_doc_t's ? */ + goto fail; + } + } + + /* allocates space for the payload */ + xmlDocDumpFormatMemory(master_doc->od_doc, + &payload, + &(ch->comm_payload_size), + 0); + if(!ch->comm_payload_size){ + error = -ENOMEM; + log_printf(LOG_ERR, "Document dump to memory failed.\n"); + goto fail; + } + + buffer = malloc(ch->comm_payload_size + sizeof(comm_header_t)); + if(!buffer){ + error = -ENOMEM; + goto fail; + } + + if(quorate){ + ch->comm_flags |= COMM_BROADCAST_FROM_QUORATE; + } + + swab_header(ch); + memcpy(buffer, ch, sizeof(comm_header_t)); + swab_header(ch); /* Swab back to dip into ch for payload_size */ + memcpy(buffer+sizeof(comm_header_t), payload, ch->comm_payload_size); + + log_printf(LOG_DEBUG, "Sending configuration (version %d)...\n", get_doc_version(master_doc->od_doc)); + sendlen = ch->comm_payload_size + sizeof(comm_header_t); + if(sendto(sfd, buffer, sendlen, 0, + (struct sockaddr *)&addr, (socklen_t)len) < 0){ + log_printf(LOG_ERR, "Sendto failed"); + error = -errno; + } + + fail: + if(buffer) free(buffer); + if(payload) free(payload); + if(ch) free(ch); + if(discard){ + if(master_doc && master_doc->od_doc) + xmlFreeDoc(master_doc->od_doc); + if(master_doc) free(master_doc); + master_doc = NULL; + } + CCSEXIT("process_broadcast"); + return error; +} diff --git a/config/daemons/ccsd/cnx_mgr.h b/config/daemons/ccsd/cnx_mgr.h new file mode 100644 index 0000000..220628a --- /dev/null +++ b/config/daemons/ccsd/cnx_mgr.h @@ -0,0 +1,7 @@ +#ifndef __CNX_MGR_DOT_H__ +#define __CNX_MGR_DOT_H__ + +int process_request(int afd); +int process_broadcast(int sfd); + +#endif /* __CNX_MGR_DOT_H__ */ diff --git a/config/daemons/ccsd/comm_headers.h b/config/daemons/ccsd/comm_headers.h new file mode 100644 index 0000000..4187fba --- /dev/null +++ b/config/daemons/ccsd/comm_headers.h @@ -0,0 +1,48 @@ +#ifndef __COMM_HEADERS_DOT_H__ +#define __COMM_HEADERS_DOT_H__ + +#include +#include + +/* Types of requests */ +#define COMM_CONNECT 1 +#define COMM_DISCONNECT 2 +#define COMM_GET 3 +#define COMM_GET_LIST 4 +#define COMM_SET 5 +#define COMM_GET_STATE 6 +#define COMM_SET_STATE 7 +#define COMM_BROADCAST 8 +#define COMM_UPDATE 9 + +/* Request flags */ +#define COMM_CONNECT_FORCE 1 +#define COMM_CONNECT_BLOCKING 2 +#define COMM_SET_STATE_RESET_QUERY 4 +#define COMM_BROADCAST_FROM_QUORATE 8 +#define COMM_UPDATE_NOTICE 16 +#define COMM_UPDATE_NOTICE_ACK 32 +#define COMM_UPDATE_COMMIT 64 +#define COMM_UPDATE_COMMIT_ACK 128 + +typedef struct comm_header_s { + int comm_type; + int comm_flags; /* flags that tune a particular type of operation */ + int comm_desc; + int comm_error; + int comm_payload_size; +} comm_header_t; + +#define COMM_LOCAL_SOCKET "/var/run/cluster/ccsd.sock" + +static inline void swab_header(comm_header_t *head) { +#if __BYTE_ORDER == __BIG_ENDIAN + head->comm_type = bswap_32(head->comm_type); + head->comm_flags = bswap_32(head->comm_flags); + head->comm_desc = bswap_32(head->comm_desc); + head->comm_error = bswap_32(head->comm_error); + head->comm_payload_size = bswap_32(head->comm_payload_size); +#endif +} + +#endif /* __COMM_HEADERS_DOT_H__ */ diff --git a/config/daemons/ccsd/debug.h b/config/daemons/ccsd/debug.h new file mode 100644 index 0000000..4ed365c --- /dev/null +++ b/config/daemons/ccsd/debug.h @@ -0,0 +1,9 @@ +#ifndef __DEBUG_DOT_H__ +#define __DEBUG_DOT_H__ + +#define CCSENTER(x) log_printf(LOG_DEBUG, "Entering " x "\n") +#define CCSEXIT(x) log_printf(LOG_DEBUG, "Exiting " x "\n") + +extern int debug; + +#endif /* __DEBUG_DOT_H__ */ diff --git a/config/daemons/ccsd/globals.c b/config/daemons/ccsd/globals.c new file mode 100644 index 0000000..6f2e582 --- /dev/null +++ b/config/daemons/ccsd/globals.c @@ -0,0 +1,19 @@ +#include + +int ppid = 0; + +char *config_file_location = NULL; +char *lockfile_location = NULL; + +int frontend_port = 50006; +int backend_port = 50007; +int cluster_base_port = 50008; + +/* -1 = no preference, 0 = IPv4, 1 = IPv6 */ +int IPv6=-1; + +/* 1 = allow and use UNIX domain sockets for local ccs queries */ +int use_local = 1; + +char *multicast_address = NULL; +int ttl=1; diff --git a/config/daemons/ccsd/globals.h b/config/daemons/ccsd/globals.h new file mode 100644 index 0000000..91fbe53 --- /dev/null +++ b/config/daemons/ccsd/globals.h @@ -0,0 +1,23 @@ +#ifndef __GLOBALS_H__ +#define __GLOBALS_H__ + +#define DEFAULT_CCSD_LOCKFILE "/var/run/cluster/ccsd.pid" + +#define EXIT_MAGMA_PLUGINS 2 /* Magma plugins are not available */ +#define EXIT_CLUSTER_FAIL 3 /* General failure to connect to cluster */ +#define EXIT_LOCKFILE 4 /* Failed to create lock file */ + +extern int ppid; + +extern char *config_file_location; +extern char *lockfile_location; + +extern int frontend_port; +extern int backend_port; +extern int cluster_base_port; + +extern int IPv6; +extern int use_local; +extern char *multicast_address; +extern int ttl; +#endif /* __GLOBALS_H__ */ diff --git a/config/daemons/ccsd/misc.c b/config/daemons/ccsd/misc.c new file mode 100644 index 0000000..543dd31 --- /dev/null +++ b/config/daemons/ccsd/misc.c @@ -0,0 +1,288 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "comm_headers.h" +#include "debug.h" +#include "misc.h" + +volatile int quorate = 0; + +int update_required = 0; +pthread_mutex_t update_lock; + +open_doc_t *master_doc = NULL; + +/** + * do_simple_xml_query + * @ctx: xml context + * @query: "/cluster/@name" + * + * it only handles this kind of query + */ +static char *do_simple_xml_query(xmlXPathContextPtr ctx, char *query) { + xmlXPathObjectPtr obj = NULL; + xmlNodePtr node = NULL; + + CCSENTER("do_simple_xml_query"); + + obj = xmlXPathEvalExpression((xmlChar *)query, ctx); + if(!obj || !obj->nodesetval || (obj->nodesetval->nodeNr != 1)) + log_printf(LOG_DEBUG, "Error processing query: %s.\n", query); + else { + node = obj->nodesetval->nodeTab[0]; + if(node->type != XML_ATTRIBUTE_NODE) + log_printf(LOG_DEBUG, "Object returned is not of attribute type.\n"); + else { + if(!node->children->content || !strlen((char *)node->children->content)) + log_printf(LOG_DEBUG, "No content found.\n"); + else { + CCSEXIT("do_simple_xml_query"); + return strdup((char *)node->children->content); + } + } + } + + if(obj) + xmlXPathFreeObject(obj); + + CCSEXIT("do_simple_xml_query"); + return NULL; +} + +int get_doc_version(xmlDocPtr ldoc){ + int i; + int error = 0; + xmlXPathContextPtr ctx = NULL; + char *res = NULL; + + CCSENTER("get_doc_version"); + + ctx = xmlXPathNewContext(ldoc); + if(!ctx){ + log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); + error = -EIO; /* ATTENTION -- what should this be? */ + goto fail; + } + + res = do_simple_xml_query(ctx, "/cluster/@config_version"); + if(res) { + for(i=0; i < strlen(res); i++){ + if(!isdigit(res[i])){ + log_printf(LOG_ERR, "config_version is not a valid integer.\n"); + error = -EINVAL; + goto fail; + } + } + error = atoi(res); + } else + error = -EINVAL; + +fail: + + if(res) + free(res); + + if(ctx){ + xmlXPathFreeContext(ctx); + } + + CCSEXIT("get_doc_version"); + return error; +} + + +/** + * get_cluster_name + * @ldoc: + * + * The caller must remember to free the string that is returned. + * + * Returns: NULL on failure, (char *) otherwise + */ +char *get_cluster_name(xmlDocPtr ldoc){ + int error = 0; + char *rtn = NULL; + xmlXPathContextPtr ctx = NULL; + + CCSENTER("get_cluster_name"); + + ctx = xmlXPathNewContext(ldoc); + if(!ctx){ + log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); + error = -EIO; /* ATTENTION -- what should this be? */ + goto fail; + } + + rtn = do_simple_xml_query(ctx, "/cluster/@name"); + +fail: + + if(ctx){ + xmlXPathFreeContext(ctx); + } + CCSEXIT("get_cluster_name"); + return rtn; +} + +/** + * set_ccs_logging + * @ldoc: + * + * Returns: -1 on failure. NULL on success. + */ +int set_ccs_logging(xmlDocPtr ldoc){ + int facility = SYSLOGFACILITY, loglevel = SYSLOGLEVEL, global_debug = 0; + char *res = NULL, *error = NULL; + xmlXPathContextPtr ctx = NULL; + unsigned int logmode; + + CCSENTER("set_ccs_logging"); + + ctx = xmlXPathNewContext(ldoc); + if(!ctx){ + log_printf(LOG_ERR, "Error: unable to create new XPath context.\n"); + return -1; + } + + logmode = logsys_config_mode_get(); + + if(!debug) { + res = do_simple_xml_query(ctx, "/cluster/logging/@debug"); + if(res) { + if(!strcmp(res, "on")) { + global_debug = 1; + } else + if(!strcmp(res, "off")) { + global_debug = 0; + } else + log_printf(LOG_ERR, "global debug: unknown value\n"); + free(res); + res=NULL; + } + + res = do_simple_xml_query(ctx, "/cluster/logging/logger_subsys[@subsys=\"CCS\"]/@debug"); + if(res) { + if(!strcmp(res, "on")) { + debug = 1; + } else + if(!strcmp(res, "off")) { /* debug from cmdline/envvars override config */ + debug = 0; + } else + log_printf(LOG_ERR, "subsys debug: unknown value\n"); + free(res); + res=NULL; + } else + debug = global_debug; /* global debug overrides subsystem only if latter is not specified */ + + res = do_simple_xml_query(ctx, "/cluster/logging/logger_subsys[@subsys=\"CCS\"]/@syslog_level"); + if(res) { + loglevel = logsys_priority_id_get (res); + if (loglevel < 0) + loglevel = SYSLOGLEVEL; + + if(!debug) { + if(loglevel == LOG_LEVEL_DEBUG) + debug = 1; + + logsys_config_priority_set (loglevel); + } + + free(res); + res=NULL; + } + } else + logsys_config_priority_set (LOG_LEVEL_DEBUG); + + res = do_simple_xml_query(ctx, "/cluster/logging/@to_stderr"); + if(res) { + if(!strcmp(res, "yes")) { + logmode |= LOG_MODE_OUTPUT_STDERR; + } else + if(!strcmp(res, "no")) { + logmode &= ~LOG_MODE_OUTPUT_STDERR; + } else + log_printf(LOG_ERR, "to_stderr: unknown value\n"); + free(res); + res=NULL; + } + + res = do_simple_xml_query(ctx, "/cluster/logging/@to_syslog"); + if(res) { + if(!strcmp(res, "yes")) { + logmode |= LOG_MODE_OUTPUT_SYSLOG_THREADED; + } else + if(!strcmp(res, "no")) { + logmode &= ~LOG_MODE_OUTPUT_SYSLOG_THREADED; + } else + log_printf(LOG_ERR, "to_syslog: unknown value\n"); + free(res); + res=NULL; + } + + res = do_simple_xml_query(ctx, "/cluster/logging/@to_file"); + if(res) { + if(!strcmp(res, "yes")) { + logmode |= LOG_MODE_OUTPUT_FILE; + } else + if(!strcmp(res, "no")) { + logmode &= ~LOG_MODE_OUTPUT_FILE; + } else + log_printf(LOG_ERR, "to_file: unknown value\n"); + free(res); + res=NULL; + } + + res = do_simple_xml_query(ctx, "/cluster/logging/@logfile"); + if(res) { + if(logsys_config_file_set(&error, res)) + log_printf(LOG_ERR, "logfile: unable to open %s for logging\n", res); + free(res); + res=NULL; + } else + log_printf(LOG_DEBUG, "logfile: use default built-in log file: %s\n", LOGDIR "/ccs.log"); + + res = do_simple_xml_query(ctx, "/cluster/logging/@syslog_facility"); + if(res) { + facility = logsys_facility_id_get (res); + if (facility < 0) { + log_printf(LOG_ERR, "syslog_facility: unknown value\n"); + facility = SYSLOGFACILITY; + } + + logsys_config_facility_set ("CCS", facility); + log_printf(LOG_DEBUG, "log_facility: %s (%d).\n", res, facility); + free(res); + res=NULL; + } + + if(ctx){ + xmlXPathFreeContext(ctx); + } + + if(logmode & LOG_MODE_BUFFER_BEFORE_CONFIG) { + log_printf(LOG_DEBUG, "logsys config enabled from set_ccs_logging\n"); + logmode &= ~LOG_MODE_BUFFER_BEFORE_CONFIG; + logmode |= LOG_MODE_FLUSH_AFTER_CONFIG; + logsys_config_mode_set (logmode); + } + + CCSEXIT("set_ccs_logging"); + return 0; +} diff --git a/config/daemons/ccsd/misc.h b/config/daemons/ccsd/misc.h new file mode 100644 index 0000000..0636c7e --- /dev/null +++ b/config/daemons/ccsd/misc.h @@ -0,0 +1,19 @@ +#ifndef __MISC_H__ +#define __MISC_H__ + +typedef struct open_doc { + int od_refs; + xmlDocPtr od_doc; +} open_doc_t; + + +extern volatile int quorate; +extern int update_required; +extern pthread_mutex_t update_lock; +extern open_doc_t *master_doc; + +char *get_cluster_name(xmlDocPtr ldoc); +int get_doc_version(xmlDocPtr ldoc); +int set_ccs_logging(xmlDocPtr ldoc); + +#endif /* __MISC_H__ */ diff --git a/config/daemons/man/Makefile b/config/daemons/man/Makefile new file mode 100644 index 0000000..cdb7712 --- /dev/null +++ b/config/daemons/man/Makefile @@ -0,0 +1,9 @@ +TARGET= + +include ../../../make/defines.mk + +ifdef legacy_code +TARGET += ccsd.8 +endif + +include $(OBJDIR)/make/man.mk diff --git a/config/daemons/man/ccsd.8 b/config/daemons/man/ccsd.8 new file mode 100644 index 0000000..bcf0957 --- /dev/null +++ b/config/daemons/man/ccsd.8 @@ -0,0 +1,74 @@ +.TH ccsd 8 + +.SH NAME +ccsd - manages the /etc/cluster/cluster.conf file + +.SH SYNOPSIS +.B ccsd +[\fIOPTION\fR].. + +.SH DESCRIPTION + +\fBccsd\fP is part of the Cluster Configuration System (CCS) and manages +the cluster.conf file in a cman cluster. It handles requests for +cluster.conf information made through libccs. It also keeps the +cluster.conf file in sync among cluster nodes based on the value of +cluster.conf:cluster/config_version. ccsd may replace the local +cluster.conf file if it discovers a newer version on another node. + +.SH OPTIONS +.TP +\fB-X\fP +Disable all cluster manager (cman) and inter-node interactions. Simply +respond to local libccs requests based on the current cluster.conf file. +.TP +\fB-4\fP +Use IPv4 for inter-node communication. By default, IPv6 is tried, then IPv4. +.TP +\fB-6\fP +Use IPv6 for inter-node communication. By default, IPv6 is tried, then IPv4. +.TP +\fB-I\fP +Force use of IP for local communication (disables use of UNIX domain sockets). +If set, \fBccsd\fP will use the specified inter-node communication protocol +(see the \fB-4\fP and \fB-6\fP options). If one is not specified, +IPv6 is tried, then IPv4. For backward compatibility, IP connections are +still allowed even when UNIX domain sockets are available. +.TP +\fB-h\fP +Help. Print out the usage syntax. +.TP +\fB-m \fP +Used to specify the multicast address. The keyword "default" can be used, +in which case "ff02::3:1" is used for IPv6 and "224.0.2.5" is used for IPv4. + +If you are using IPv4, the default action is to use broadcast. Specifying +this option will cause multicast to be used in that instance. +.TP +\fB-n\fP +No daemon. Run in the foreground. +.TP +\fB-P :\fP +You have the option of specifying the port numbers used by ccsd. The port +identifier is either: b, c, or f. "b" is the port which ccsd attempts to +communicate with ccsd processes on other machines, via broadcast/multicast, to +obtain or validate its config file (cluster.conf). This is known as the backend +port. "c" is the base port number of two consecutive ports used by ccsd +processes to communicate cluster membership information. This is known as the +cluster base port. "f" is the port number that listens for information requests +from the CCS library (or programs using it). This is known as the frontend port. + +So, to change the frontend port one might specify \fI-P f:60000\fP. +.TP +\fB-t \fP +Set the multicast threshold (aka time to live). +.TP +\fB-V\fP +Print the version information. +.TP +\fB-d\fP +Enable debugging output. + +.SH SEE ALSO +ccs(7), cman(5), ccs_tool(8), ccs_test(8), cluster.conf(5) + diff --git a/config/libs/libccscompat/Makefile b/config/libs/libccscompat/Makefile index e8ecae6..dd7db01 100644 --- a/config/libs/libccscompat/Makefile +++ b/config/libs/libccscompat/Makefile @@ -11,5 +11,5 @@ include $(OBJDIR)/make/uninstall.mk CFLAGS += -D_FILE_OFFSET_BITS=64 CFLAGS += -fPIC -CFLAGS += -I$(SRCDIR)/ccs/daemon +CFLAGS += -I$(SRCDIR)/config/daemons/ccsd CFLAGS += -I${incdir} diff --git a/config/tools/ccs_tool/Makefile b/config/tools/ccs_tool/Makefile index b696c4e..be8805e 100644 --- a/config/tools/ccs_tool/Makefile +++ b/config/tools/ccs_tool/Makefile @@ -6,11 +6,7 @@ SBINSYMT = $(TARGET2) include ../../../make/defines.mk -ifdef legacy_code -all: depends ${TARGET1} ${TARGET2} -else all: ${TARGET1} ${TARGET2} -endif include $(OBJDIR)/make/cobj.mk include $(OBJDIR)/make/clean.mk @@ -26,7 +22,7 @@ endif CFLAGS += -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE CFLAGS += -I${cmanincdir} `xml2-config --cflags` -CFLAGS += -I${ccsincdir} -I$(SRCDIR)/ccs/daemon +CFLAGS += -I${ccsincdir} -I$(SRCDIR)/config/daemons/ccsd CFLAGS += -I$(SRCDIR)/config/libs/libccscompat CFLAGS += -I${incdir} @@ -46,8 +42,9 @@ ${TARGET1}: ${OBJS} ${LDDEPS} ${TARGET2}: ${TARGET1} ln -sf ${TARGET1} ${TARGET2} -ifdef legacy_code depends: + $(MAKE) -C $(OBJDIR)/cman/lib all +ifdef legacy_code $(MAKE) -C $(OBJDIR)/config/libs/libccscompat all endif diff --git a/make/defines.mk.input b/make/defines.mk.input index 353f338..0f62d7d 100644 --- a/make/defines.mk.input +++ b/make/defines.mk.input @@ -73,7 +73,6 @@ enable_xen ?= @ENABLE_XEN@ without_gnbd-kernel/src ?= @DISABLE_GNBDKERNEL@ without_gfs-kernel/src/gfs ?= @DISABLE_GFSKERNEL@ without_cman/lib ?= @DISABLE_CMAN@ -without_ccs ?= @DISABLE_CCS@ without_cman ?= @DISABLE_CMAN@ without_dlm ?= @DISABLE_DLM@ without_group ?= @DISABLE_GROUP@