mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 18:55:19 +00:00 
			
		
		
		
	Properly queue files with inotify(7).
(closes issue #18089) Reported by: abelbeck Patches: 20101021__issue18089.diff.txt uploaded by tilghman (license 14) Tested by: tilghman git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/1.8@294569 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
		
							
								
								
									
										150
									
								
								pbx/pbx_spool.c
									
									
									
									
									
								
							
							
						
						
									
										150
									
								
								pbx/pbx_spool.c
									
									
									
									
									
								
							| @@ -26,11 +26,6 @@ | ||||
|  | ||||
| ASTERISK_FILE_VERSION(__FILE__, "$Revision$") | ||||
|  | ||||
| /* Handling of call files using inotify is not functioning correctly currently: | ||||
|  * Issue 18089 - https://issues.asterisk.org/view.php?id=18089  | ||||
|  */ | ||||
| #undef HAVE_INOTIFY | ||||
|  | ||||
| #include <sys/stat.h> | ||||
| #include <time.h> | ||||
| #include <utime.h> | ||||
| @@ -67,7 +62,7 @@ enum { | ||||
| 	 */ | ||||
| 	SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), | ||||
| 	/* Don't unlink the call file after processing, move in qdonedir */ | ||||
| 	SPOOL_FLAG_ARCHIVE = (1 << 1) | ||||
| 	SPOOL_FLAG_ARCHIVE = (1 << 1), | ||||
| }; | ||||
|  | ||||
| static char qdir[255]; | ||||
| @@ -98,6 +93,10 @@ struct outgoing { | ||||
| 	struct ast_flags options;                 /*!< options */ | ||||
| }; | ||||
|  | ||||
| #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) | ||||
| static void queue_file(const char *filename, time_t when); | ||||
| #endif | ||||
|  | ||||
| static int init_outgoing(struct outgoing *o) | ||||
| { | ||||
| 	o->priority = 1; | ||||
| @@ -260,24 +259,20 @@ static int apply_outgoing(struct outgoing *o, const char *fn, FILE *f) | ||||
|  | ||||
| static void safe_append(struct outgoing *o, time_t now, char *s) | ||||
| { | ||||
| 	int fd; | ||||
| 	FILE *f; | ||||
| 	struct utimbuf tbuf; | ||||
| 	struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime }; | ||||
|  | ||||
| 	if ((fd = open(o->fn, O_WRONLY | O_APPEND)) < 0) | ||||
| 		return; | ||||
| 	ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s); | ||||
|  | ||||
| 	if ((f = fdopen(fd, "a"))) { | ||||
| 	if ((f = fopen(o->fn, "a"))) { | ||||
| 		fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now); | ||||
| 		fclose(f); | ||||
| 	} else | ||||
| 		close(fd); | ||||
| 	} | ||||
|  | ||||
| 	/* Update the file time */ | ||||
| 	tbuf.actime = now; | ||||
| 	tbuf.modtime = now + o->retrytime; | ||||
| 	if (utime(o->fn, &tbuf)) | ||||
| 	if (utime(o->fn, &tbuf)) { | ||||
| 		ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno)); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| /*! | ||||
| @@ -288,7 +283,6 @@ static void safe_append(struct outgoing *o, time_t now, char *s) | ||||
|  */ | ||||
| static int remove_from_queue(struct outgoing *o, const char *status) | ||||
| { | ||||
| 	int fd; | ||||
| 	FILE *f; | ||||
| 	char newfn[256]; | ||||
| 	const char *bname; | ||||
| @@ -297,8 +291,9 @@ static int remove_from_queue(struct outgoing *o, const char *status) | ||||
| 		struct stat current_file_status; | ||||
|  | ||||
| 		if (!stat(o->fn, ¤t_file_status)) { | ||||
| 			if (time(NULL) < current_file_status.st_mtime) | ||||
| 			if (time(NULL) < current_file_status.st_mtime) { | ||||
| 				return 0; | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -313,26 +308,28 @@ static int remove_from_queue(struct outgoing *o, const char *status) | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	if ((fd = open(o->fn, O_WRONLY | O_APPEND))) { | ||||
| 		if ((f = fdopen(fd, "a"))) { | ||||
| 			fprintf(f, "Status: %s\n", status); | ||||
| 			fclose(f); | ||||
| 		} else | ||||
| 			close(fd); | ||||
| 	if (!(bname = strrchr(o->fn, '/'))) { | ||||
| 		bname = o->fn; | ||||
| 	} else { | ||||
| 		bname++; | ||||
| 	} | ||||
|  | ||||
| 	if (!(bname = strrchr(o->fn, '/'))) | ||||
| 		bname = o->fn; | ||||
| 	else | ||||
| 		bname++;	 | ||||
| 	snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname); | ||||
| 	/* a existing call file the archive dir is overwritten */ | ||||
| 	unlink(newfn); | ||||
| 	if (rename(o->fn, newfn) != 0) { | ||||
| 		unlink(o->fn); | ||||
| 		return -1; | ||||
| 	} else | ||||
| 		return 0; | ||||
| 	} | ||||
|  | ||||
| 	/* Only append to the file AFTER we move it out of the watched directory, | ||||
| 	 * otherwise the fclose() causes another event for inotify(7) */ | ||||
| 	if ((f = fopen(newfn, "a"))) { | ||||
| 		fprintf(f, "Status: %s\n", status); | ||||
| 		fclose(f); | ||||
| 	} | ||||
|  | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| static void *attempt_thread(void *data) | ||||
| @@ -357,6 +354,9 @@ static void *attempt_thread(void *data) | ||||
| 		} else { | ||||
| 			/* Notate that the call is still active */ | ||||
| 			safe_append(o, time(NULL), "EndRetry"); | ||||
| #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) | ||||
| 			queue_file(o->fn, time(NULL) + o->retrytime); | ||||
| #endif | ||||
| 		} | ||||
| 	} else { | ||||
| 		ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest); | ||||
| @@ -399,10 +399,12 @@ static int scan_service(const char *fn, time_t now) | ||||
| 	} | ||||
|  | ||||
| 	/* Attempt to open the file */ | ||||
| 	if (!(f = fopen(fn, "r+"))) { | ||||
| 	if (!(f = fopen(fn, "r"))) { | ||||
| 		remove_from_queue(o, "Failed"); | ||||
| 		free_outgoing(o); | ||||
| #if !defined(HAVE_INOTIFY) && !defined(HAVE_KQUEUE) | ||||
| 		ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno)); | ||||
| #endif | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| @@ -453,8 +455,9 @@ struct direntry { | ||||
| 	char name[0]; | ||||
| }; | ||||
|  | ||||
| static AST_LIST_HEAD_STATIC(dirlist, direntry); | ||||
| /* Only one thread is accessing this list, so no lock is necessary */ | ||||
| static AST_LIST_HEAD_NOLOCK_STATIC(dirlist, direntry); | ||||
| static AST_LIST_HEAD_NOLOCK_STATIC(createlist, direntry); | ||||
|  | ||||
| static void queue_file(const char *filename, time_t when) | ||||
| { | ||||
| @@ -482,17 +485,18 @@ static void queue_file(const char *filename, time_t when) | ||||
| 		when = st.st_mtime; | ||||
| 	} | ||||
|  | ||||
| #ifndef HAVE_INOTIFY | ||||
| 	/* Need to check the existing list for kqueue(2), in order to avoid duplicates. */ | ||||
| 	/* Need to check the existing list in order to avoid duplicates. */ | ||||
| 	AST_LIST_LOCK(&dirlist); | ||||
| 	AST_LIST_TRAVERSE(&dirlist, cur, list) { | ||||
| 		if (cur->mtime == when && !strcmp(filename, cur->name)) { | ||||
| 			AST_LIST_UNLOCK(&dirlist); | ||||
| 			return; | ||||
| 		} | ||||
| 	} | ||||
| #endif | ||||
|  | ||||
| 	if ((res = when) > now || (res = scan_service(filename, now)) > 0) { | ||||
| 		if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) { | ||||
| 			AST_LIST_UNLOCK(&dirlist); | ||||
| 			return; | ||||
| 		} | ||||
| 		new->mtime = res; | ||||
| @@ -515,8 +519,43 @@ static void queue_file(const char *filename, time_t when) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	AST_LIST_UNLOCK(&dirlist); | ||||
| } | ||||
|  | ||||
| #ifdef HAVE_INOTIFY | ||||
| static void queue_file_create(const char *filename) | ||||
| { | ||||
| 	struct direntry *cur; | ||||
|  | ||||
| 	AST_LIST_TRAVERSE(&createlist, cur, list) { | ||||
| 		if (!strcmp(cur->name, filename)) { | ||||
| 			return; | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) { | ||||
| 		return; | ||||
| 	} | ||||
| 	strcpy(cur->name, filename); | ||||
| 	AST_LIST_INSERT_TAIL(&createlist, cur, list); | ||||
| } | ||||
|  | ||||
| static void queue_file_write(const char *filename) | ||||
| { | ||||
| 	struct direntry *cur; | ||||
| 	/* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */ | ||||
| 	AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) { | ||||
| 		if (!strcmp(cur->name, filename)) { | ||||
| 			AST_LIST_REMOVE_CURRENT(list); | ||||
| 			ast_free(cur); | ||||
| 			queue_file(filename, 0); | ||||
| 			break; | ||||
| 		} | ||||
| 	} | ||||
| 	AST_LIST_TRAVERSE_SAFE_END | ||||
| } | ||||
| #endif | ||||
|  | ||||
| static void *scan_thread(void *unused) | ||||
| { | ||||
| 	DIR *dir; | ||||
| @@ -524,15 +563,10 @@ static void *scan_thread(void *unused) | ||||
| 	time_t now; | ||||
| 	struct timespec ts = { .tv_sec = 1 }; | ||||
| #ifdef HAVE_INOTIFY | ||||
| 	int res; | ||||
| 	ssize_t res; | ||||
| 	int inotify_fd = inotify_init(); | ||||
| 	struct { | ||||
| 		struct inotify_event iev; | ||||
| 		/* It may not look like we're using this element, but when we read | ||||
| 		 * from inotify_fd, the event is typically larger than the first | ||||
| 		 * struct, and overflows into this second one. */ | ||||
| 		char name[FILENAME_MAX + 1]; | ||||
| 	} buf; | ||||
| 	struct inotify_event *iev; | ||||
| 	char buf[8192] __attribute__((aligned (sizeof(int)))); | ||||
| 	struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN }; | ||||
| #else | ||||
| 	struct timespec nowait = { 0, 1 }; | ||||
| @@ -557,7 +591,7 @@ static void *scan_thread(void *unused) | ||||
| 	} | ||||
|  | ||||
| #ifdef HAVE_INOTIFY | ||||
| 	inotify_add_watch(inotify_fd, qdir, IN_CLOSE_WRITE | IN_MOVED_TO); | ||||
| 	inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_CLOSE_WRITE | IN_MOVED_TO); | ||||
| #endif | ||||
|  | ||||
| 	/* First, run through the directory and clear existing entries */ | ||||
| @@ -567,7 +601,7 @@ static void *scan_thread(void *unused) | ||||
| 	} | ||||
|  | ||||
| #ifndef HAVE_INOTIFY | ||||
| 	EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE, NOTE_WRITE | NOTE_EXTEND | NOTE_DELETE | NOTE_REVOKE | NOTE_ATTRIB, 0, NULL); | ||||
| 	EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL); | ||||
| 	if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) { | ||||
| 		ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno)); | ||||
| 	} | ||||
| @@ -595,17 +629,23 @@ static void *scan_thread(void *unused) | ||||
| 			int waittime = next == INT_MAX ? -1 : (next - now) * 1000; | ||||
| 			/* When a file arrives, add it to the queue, in mtime order. */ | ||||
| 			if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) && | ||||
| 				(res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(buf.iev)) { | ||||
| 				(res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) { | ||||
| 				ssize_t len = 0; | ||||
| 				/* File(s) added to directory, add them to my list */ | ||||
| 				do { | ||||
| 					queue_file(buf.iev.name, 0); | ||||
| 					res -= sizeof(buf.iev) + buf.iev.len; | ||||
| 					if (res >= sizeof(buf.iev)) { | ||||
| 						memmove(&buf.iev, &buf.iev.name[buf.iev.len], res); | ||||
| 						continue; | ||||
| 				for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) { | ||||
| 					if (iev->mask & IN_CREATE) { | ||||
| 						queue_file_create(iev->name); | ||||
| 					} else if (iev->mask & IN_CLOSE_WRITE) { | ||||
| 						queue_file_write(iev->name); | ||||
| 					} else if (iev->mask & IN_MOVED_TO) { | ||||
| 						queue_file(iev->name, 0); | ||||
| 					} else { | ||||
| 						ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name); | ||||
| 					} | ||||
| 					break; | ||||
| 				} while (1); | ||||
|  | ||||
| 					len = sizeof(*iev) + iev->len; | ||||
| 					res -= len; | ||||
| 				} | ||||
| 			} else if (res < 0 && errno != EINTR && errno != EAGAIN) { | ||||
| 				ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno)); | ||||
| 			} | ||||
| @@ -626,11 +666,13 @@ static void *scan_thread(void *unused) | ||||
| 		} | ||||
|  | ||||
| 		/* Empty the list of all entries ready to be processed */ | ||||
| 		AST_LIST_LOCK(&dirlist); | ||||
| 		while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) { | ||||
| 			cur = AST_LIST_REMOVE_HEAD(&dirlist, list); | ||||
| 			queue_file(cur->name, cur->mtime); | ||||
| 			ast_free(cur); | ||||
| 		} | ||||
| 		AST_LIST_UNLOCK(&dirlist); | ||||
| 	} | ||||
| 	return NULL; | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user