diff --git a/docs/manual/logs.xml b/docs/manual/logs.xml index fcfbc75..1057eca 100644 --- a/docs/manual/logs.xml +++ b/docs/manual/logs.xml @@ -614,6 +614,26 @@ CustomLog "|$/usr/local/apache/bin/rotatelogs /var/log/access_log 86400" commo "||" is also supported and equivalent to using "|".

+

Depending on your OS, there may be a pipe buffer size limit + for safe, atomic writes to a pipe. Log records that exceed that + buffer limit may be interleaved and corrupted. + Use "|<" (mnemonic, split pipe) to split log + records across the pipe so they may be reassembled by the piped + program to preserve their integrity. The program receiving the + records must know how to reassemble the records, and currently, + rotatelogs supports this. When used, a variable + (AP_MOD_LOG_CONFIG_CHUNKED_MSG) is exported to the environment + so the receiving program can take appropriate action. This option + may be used in conjunction with the shell option + |<$" (or for compatibility "||<"). + This option is not available for ErrorLog directives and if + present, a warning will be emitted.

+ + +# Invoke "rotatelogs" with a safe (atomic split) pipe +CustomLog "|</usr/local/apache/bin/rotatelogs /var/log/access_log 86400" common + + Windows note

Note that on Windows, you may run into problems when running many piped logger processes, especially when HTTPD is running as a service. This is diff --git a/docs/manual/mod/mod_log_config.xml b/docs/manual/mod/mod_log_config.xml index 4e2297e..3edd6f5 100644 --- a/docs/manual/mod/mod_log_config.xml +++ b/docs/manual/mod/mod_log_config.xml @@ -476,7 +476,8 @@ expr=expression] >ServerRoot.

pipe
-
The pipe character "|", followed by the path +
The pipe character "|" (or safe pipe + "|<"), followed by the path to a program to receive the log information on its standard input. See the notes on piped logs for more information. diff --git a/include/http_log.h b/include/http_log.h index 31ee49b..6ff659f 100644 --- a/include/http_log.h +++ b/include/http_log.h @@ -282,6 +282,25 @@ AP_DECLARE_DATA extern int ap_default_loglevel; */ #define APLOG_MARK __FILE__,__LINE__,APLOG_MODULE_INDEX +/* POSIX.1 defines PIPE_BUF as the maximum number of bytes that is + * guaranteed to be atomic when writing a pipe. And PIPE_BUF >= 512 + * is guaranteed. So we'll just guess 512 in the event the system + * doesn't have this. Now, for file writes there is actually no limit, + * the entire write is atomic. Whether all systems implement this + * correctly is another question entirely ... so we'll just use PIPE_BUF + * because it's probably a good guess as to what is implemented correctly + * everywhere. + */ +#ifdef PIPE_BUF +#define LOG_BUFSIZE PIPE_BUF +#else +#define LOG_BUFSIZE (512) +#endif + +#define MSG_HEADER_ELT_CNT 5 +#define MSG_HEADER_LEN (sizeof(uint32_t) * MSG_HEADER_ELT_CNT) +#define LOG_BUFSIZE_LESS_CHUNKHEAD (LOG_BUFSIZE - MSG_HEADER_LEN) + /** * Set up for logging to stderr. * @param p The pool to allocate out of diff --git a/include/scoreboard.h b/include/scoreboard.h index 015aec9..1befbce 100644 --- a/include/scoreboard.h +++ b/include/scoreboard.h @@ -157,6 +157,10 @@ typedef struct { } scoreboard; typedef struct ap_sb_handle_t ap_sb_handle_t; +struct ap_sb_handle_t { + int child_num; + int thread_num; +}; /* * Creation and deletion (internal) diff --git a/modules/loggers/mod_log_config.c b/modules/loggers/mod_log_config.c index 6f9201d..8c7abff 100644 --- a/modules/loggers/mod_log_config.c +++ b/modules/loggers/mod_log_config.c @@ -151,8 +151,10 @@ #include "apr_hash.h" #include "apr_optional.h" #include "apr_anylock.h" +#include "apr_env.h" #define APR_WANT_STRFUNC +#define APR_WANT_BYTEFUNC /* for htons() et al */ #include "apr_want.h" #include "ap_config.h" @@ -165,6 +167,7 @@ #include "util_time.h" #include "ap_mpm.h" #include "ap_provider.h" +#include "scoreboard.h" #if APR_HAVE_UNISTD_H #include @@ -178,25 +181,39 @@ module AP_MODULE_DECLARE_DATA log_config_module; +typedef struct { + apr_file_t *handle; + apr_size_t outcnt; + char outbuf[LOG_BUFSIZE]; + apr_anylock_t mutex; +} buffered_log; + static int xfer_flags = (APR_WRITE | APR_APPEND | APR_CREATE | APR_LARGEFILE); static apr_fileperms_t xfer_perms = APR_OS_DEFAULT; static apr_hash_t *log_hash; +static apr_status_t write_atomic_pipe_chunks(request_rec *r, + void *handle, + const char *str, + apr_size_t len, + buffered_log *buf); static apr_status_t ap_default_log_writer(request_rec *r, void *handle, const char **strs, int *strl, int nelts, - apr_size_t len); + apr_size_t len, + int chunk_msgs); static apr_status_t ap_buffered_log_writer(request_rec *r, void *handle, const char **strs, int *strl, int nelts, - apr_size_t len); + apr_size_t len, + int chunk_msgs); static void *ap_default_log_writer_init(apr_pool_t *p, server_rec *s, - const char* name); + config_log_state* cls); static void *ap_buffered_log_writer_init(apr_pool_t *p, server_rec *s, - const char* name); + config_log_state* cls); static ap_log_writer_init *ap_log_set_writer_init(ap_log_writer_init *handle); static ap_log_writer *ap_log_set_writer(ap_log_writer *handle); @@ -205,21 +222,6 @@ static ap_log_writer_init *log_writer_init = ap_default_log_writer_init; static int buffered_logs = 0; /* default unbuffered */ static apr_array_header_t *all_buffered_logs = NULL; -/* POSIX.1 defines PIPE_BUF as the maximum number of bytes that is - * guaranteed to be atomic when writing a pipe. And PIPE_BUF >= 512 - * is guaranteed. So we'll just guess 512 in the event the system - * doesn't have this. Now, for file writes there is actually no limit, - * the entire write is atomic. Whether all systems implement this - * correctly is another question entirely ... so we'll just use PIPE_BUF - * because it's probably a good guess as to what is implemented correctly - * everywhere. - */ -#ifdef PIPE_BUF -#define LOG_BUFSIZE PIPE_BUF -#else -#define LOG_BUFSIZE (512) -#endif - /* * multi_log_state is our per-(virtual)-server configuration. We store * an array of the logs we are going to use, each of type config_log_state. @@ -253,14 +255,7 @@ typedef struct { * set to a opaque structure (usually a fd) after it is opened. */ -typedef struct { - apr_file_t *handle; - apr_size_t outcnt; - char outbuf[LOG_BUFSIZE]; - apr_anylock_t mutex; -} buffered_log; - -typedef struct { +struct config_log_state { const char *fname; const char *format_string; apr_array_header_t *format; @@ -270,7 +265,8 @@ typedef struct { ap_expr_info_t *condition_expr; /** place of definition or NULL if already checked */ const ap_directive_t *directive; -} config_log_state; + int chunked; +}; /* * log_request_state holds request specific log data that is not @@ -1212,7 +1208,7 @@ static int config_log_transaction(request_rec *r, config_log_state *cls, "log writer isn't correctly setup"); return HTTP_INTERNAL_SERVER_ERROR; } - rv = log_writer(r, cls->log_writer, strs, strl, format->nelts, len); + rv = log_writer(r, cls->log_writer, strs, strl, format->nelts, len, cls->chunked); if (rv != APR_SUCCESS) { ap_log_rerror(APLOG_MARK, APLOG_WARNING, rv, r, APLOGNO(00646) "Error writing to %s", cls->fname); @@ -1364,6 +1360,9 @@ static const char *add_custom_log(cmd_parms *cmd, void *dummy, const char *fn, } cls->fname = fn; + if (fn && (!strncmp(fn, "|<", 2) || !strncmp(fn, "||<", 3))) { + cls->chunked = 1; + } cls->format_string = fmt; cls->directive = cmd->directive; if (fmt == NULL) { @@ -1423,6 +1422,7 @@ static const char *set_buffered_logs_on(cmd_parms *parms, void *dummy, int flag) } return NULL; } + static const command_rec config_log_cmds[] = { AP_INIT_TAKE23("CustomLog", add_custom_log, NULL, RSRC_CONF, @@ -1451,7 +1451,7 @@ static config_log_state *open_config_log(server_rec *s, apr_pool_t *p, return cls; /* Leave it NULL to decline. */ } - cls->log_writer = log_writer_init(p, s, cls->fname); + cls->log_writer = log_writer_init(p, s, cls); if (cls->log_writer == NULL) return NULL; @@ -1640,12 +1640,87 @@ static ap_log_writer *ap_log_set_writer(ap_log_writer *handle) return old; } +static apr_status_t write_atomic_pipe_chunks(request_rec *r, + void *handle, + const char *str, + apr_size_t len, + buffered_log *buf) +{ + apr_status_t rv; + apr_size_t chunk_body_len; + apr_size_t write_cnt; + apr_size_t msg_read_len = 0; + uint32_t proc_slot_xfer, thread_slot_xfer; + char msg_chunk[LOG_BUFSIZE]; + uint32_t chunk_order = 1; + uint32_t chunk_order_xfer; + uint32_t total_chunk_cnt = (len / LOG_BUFSIZE_LESS_CHUNKHEAD) + + !!(len % LOG_BUFSIZE_LESS_CHUNKHEAD); + uint32_t total_chunk_cnt_xfer = htonl(total_chunk_cnt); + uint32_t chunk_body_len_xfer; + ap_sb_handle_t *sbh = r->connection->sbh; + uint32_t cpy_pos = 0; + char *msg_buf = msg_chunk; + + if (buf != NULL) + msg_buf = &buf->outbuf[buf->outcnt]; + + proc_slot_xfer = htonl(sbh->child_num); + thread_slot_xfer = htonl(sbh->thread_num); + + /* + * Split the log line into PIPE_BUF chunks to avoid interleaving concurrent + * writes to the pipe that may corrupt the log data. + */ + do { + if (buf == NULL) + cpy_pos = 0; + chunk_body_len = len - msg_read_len; + if (chunk_body_len > LOG_BUFSIZE_LESS_CHUNKHEAD) + chunk_body_len = LOG_BUFSIZE_LESS_CHUNKHEAD; + memcpy(msg_buf + cpy_pos, &proc_slot_xfer, sizeof(uint32_t)); + cpy_pos += sizeof(uint32_t); + memcpy(msg_buf + cpy_pos, &thread_slot_xfer, sizeof(uint32_t)); + cpy_pos += sizeof(uint32_t); + chunk_order_xfer = htonl(chunk_order); + memcpy(msg_buf + cpy_pos, &chunk_order_xfer, sizeof(uint32_t)); + cpy_pos += sizeof(uint32_t); + memcpy(msg_buf + cpy_pos, &total_chunk_cnt_xfer, sizeof(uint32_t)); + cpy_pos += sizeof(uint32_t); + + /* Convert potentially long int to message header uint32_t. */ + chunk_body_len_xfer = htonl(chunk_body_len); + memcpy(msg_buf + cpy_pos, &chunk_body_len_xfer, sizeof(uint32_t)); + cpy_pos += sizeof(uint32_t); + memcpy(msg_buf + cpy_pos, str + msg_read_len, chunk_body_len); + cpy_pos += chunk_body_len; + + if (buf == NULL) { + write_cnt = chunk_body_len + MSG_HEADER_LEN; + if ((rv = apr_file_write((apr_file_t*)handle, msg_buf, &write_cnt)) + != APR_SUCCESS) { + return rv; + } + } + + msg_read_len += chunk_body_len; + chunk_order++; + + } while (msg_read_len < len); + + if (buf != NULL) + buf->outcnt += cpy_pos; + + return rv; +} + static apr_status_t ap_default_log_writer( request_rec *r, void *handle, const char **strs, int *strl, int nelts, - apr_size_t len) + apr_size_t len, + int chunk_msgs) { default_log_writer *log_writer = handle; @@ -1666,8 +1741,14 @@ static apr_status_t ap_default_log_writer( request_rec *r, } if (log_writer->type == LOG_WRITER_FD) { - rv = apr_file_write_full((apr_file_t*)log_writer->log_writer, str, - len, NULL); + if (chunk_msgs) { + rv = write_atomic_pipe_chunks(r, log_writer->log_writer, str, len, + NULL); + } + else { + rv = apr_file_write_full((apr_file_t*)log_writer->log_writer, str, + len, NULL); + } } else { errorlog_provider_data *data = log_writer->log_writer; @@ -1689,8 +1770,10 @@ static apr_status_t ap_default_log_writer( request_rec *r, return rv; } static void *ap_default_log_writer_init(apr_pool_t *p, server_rec *s, - const char* name) + config_log_state* cls) { + apr_status_t rv; + const char *name = cls->fname; default_log_writer *log_writer; const char *provider_name = name; ap_errorlog_provider *provider = NULL; @@ -1706,6 +1789,23 @@ static void *ap_default_log_writer_init(apr_pool_t *p, server_rec *s, if (*name == '|') { piped_log *pl; + if (cls->chunked) { + if ((rv = apr_env_set("AP_MOD_LOG_CONFIG_CHUNKED_MSG", "", p)) != + APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, s, APLOGNO(035011) + "Unable to apr_env_set"); + return NULL; + } + /* + * Skip over the initial '|' because we know a '<' follows + * somewhere, which will be skipped further below. + */ + ++name; + + /* To support legacy syntax '||', which has no other effect. */ + if (*name == '|') + ++name; + } pl = ap_open_piped_log(p, name + 1); if (pl == NULL) { return NULL; @@ -1763,11 +1863,11 @@ static void *ap_default_log_writer_init(apr_pool_t *p, server_rec *s, } } static void *ap_buffered_log_writer_init(apr_pool_t *p, server_rec *s, - const char* name) + config_log_state* cls) { buffered_log *b; b = apr_pcalloc(p, sizeof(buffered_log)); - b->handle = ap_default_log_writer_init(p, s, name); + b->handle = ap_default_log_writer_init(p, s, cls); if (b->handle) { *(buffered_log **)apr_array_push(all_buffered_logs) = b; @@ -1781,23 +1881,29 @@ static apr_status_t ap_buffered_log_writer(request_rec *r, const char **strs, int *strl, int nelts, - apr_size_t len) + apr_size_t len, + int chunk_msgs) { char *str; char *s; int i; apr_status_t rv; + apr_size_t buf_len_limit = LOG_BUFSIZE; buffered_log *buf = (buffered_log*)handle; if ((rv = APR_ANYLOCK_LOCK(&buf->mutex)) != APR_SUCCESS) { return rv; } - if (len + buf->outcnt > LOG_BUFSIZE) { + if (chunk_msgs) { + buf_len_limit = LOG_BUFSIZE_LESS_CHUNKHEAD; + } + + if (len + buf->outcnt > buf_len_limit) { flush_log(buf); } - if (len >= LOG_BUFSIZE) { + if (len >= buf_len_limit) { apr_size_t w; /* @@ -1809,17 +1915,38 @@ static apr_status_t ap_buffered_log_writer(request_rec *r, memcpy(s, strs[i], strl[i]); s += strl[i]; } - w = len; - rv = apr_file_write_full(buf->handle, str, w, NULL); + + if (chunk_msgs) { + rv = write_atomic_pipe_chunks(r, buf->handle, str, len, NULL); + } + else { + w = len; + rv = apr_file_write_full(buf->handle, str, w, NULL); + } } else { - for (i = 0, s = &buf->outbuf[buf->outcnt]; i < nelts; ++i) { - memcpy(s, strs[i], strl[i]); - s += strl[i]; + if (chunk_msgs) { + /* + * We do this memcpy dance because write() is atomic for + * len < PIPE_BUF, while writev() need not be. + */ + str = apr_palloc(r->pool, len + 1); + for (i = 0, s = str; i < nelts; ++i) { + memcpy(s, strs[i], strl[i]); + s += strl[i]; + } + + rv = write_atomic_pipe_chunks(r, buf->handle, str, len, buf); + } + else { + for (i = 0, s = &buf->outbuf[buf->outcnt]; i < nelts; ++i) { + memcpy(s, strs[i], strl[i]); + s += strl[i]; + } + buf->outcnt += len; + rv = APR_SUCCESS; } - buf->outcnt += len; - rv = APR_SUCCESS; } APR_ANYLOCK_UNLOCK(&buf->mutex); diff --git a/modules/loggers/mod_log_config.h b/modules/loggers/mod_log_config.h index 877a593..156c9b6 100644 --- a/modules/loggers/mod_log_config.h +++ b/modules/loggers/mod_log_config.h @@ -30,6 +30,8 @@ #ifndef _MOD_LOG_CONFIG_H #define _MOD_LOG_CONFIG_H 1 +typedef struct config_log_state config_log_state; + /** * callback function prototype for a external log handler */ @@ -39,7 +41,7 @@ typedef const char *ap_log_handler_fn_t(request_rec *r, char *a); * callback function prototype for external writer initialization. */ typedef void *ap_log_writer_init(apr_pool_t *p, server_rec *s, - const char *name); + config_log_state* cls); /** * callback which gets called where there is a log line to write. */ @@ -49,7 +51,8 @@ typedef apr_status_t ap_log_writer( const char **portions, int *lengths, int nelts, - apr_size_t len); + apr_size_t len, + int chunk_msgs); typedef struct ap_log_handler { ap_log_handler_fn_t *func; diff --git a/server/log.c b/server/log.c index 11b8237..b48a249 100644 --- a/server/log.c +++ b/server/log.c @@ -313,6 +313,12 @@ static int open_error_log(server_rec *s, int is_main, apr_pool_t *p) */ if (*fname == '|') ++fname; + if (*fname == '<') { + ap_log_error(APLOG_MARK, APLOG_STARTUP, rc, NULL, APLOGNO(035010) + "Safe pipe '<' not supported for ErrorLog: '%s'.", + s->error_fname); + ++fname; + } if (*fname == '$') { cmdtype = APR_SHELLCMD_ENV; ++fname; diff --git a/server/scoreboard.c b/server/scoreboard.c index 4eb1b05..0a2e86e 100644 --- a/server/scoreboard.c +++ b/server/scoreboard.c @@ -104,11 +104,6 @@ AP_IMPLEMENT_HOOK_RUN_ALL(int,pre_mpm, static APR_OPTIONAL_FN_TYPE(ap_logio_get_last_bytes) *pfn_ap_logio_get_last_bytes; -struct ap_sb_handle_t { - int child_num; - int thread_num; -}; - static int server_limit, thread_limit; static apr_size_t scoreboard_size; diff --git a/support/rotatelogs.c b/support/rotatelogs.c index b5c0e89..2c0e06c 100644 --- a/support/rotatelogs.c +++ b/support/rotatelogs.c @@ -25,6 +25,9 @@ #include "apr_getopt.h" #include "apr_thread_proc.h" #include "apr_signal.h" +#include "apr_env.h" +#include "apr_hash.h" +#include "apr_tables.h" #if APR_FILES_AS_SOCKETS #include "apr_poll.h" #endif @@ -33,8 +36,12 @@ #include #endif #define APR_WANT_STRFUNC +#define APR_WANT_BYTEFUNC /* for htons() et al */ #include "apr_want.h" +#include "httpd.h" +#include "http_log.h" + #define BUFSIZE 65536 #define ROTATE_NONE 0 @@ -556,6 +563,121 @@ static const char *get_time_or_size(rotate_config_t *config, return NULL; } +static apr_status_t read_chunk(apr_file_t *f_stdin, apr_pool_t *p, char *buf, + char **msg, apr_hash_t *thread_chunks, + apr_size_t *nRead) +{ + apr_status_t rv; + uint32_t chunk_header[MSG_HEADER_ELT_CNT]; + uint32_t proc_slot, thd_slot, chunk_cnt, tot_chunk_cnt, chunk_body_len; + + /* Read just the chunk header, expected to be present in its entirety. */ + rv = apr_file_read_full(f_stdin, chunk_header, MSG_HEADER_LEN, nRead); + if (APR_STATUS_IS_EOF(rv)) { + return rv; + } + else if (rv != APR_SUCCESS) { + exit(3); + } + proc_slot = ntohl(chunk_header[0]); + thd_slot = ntohl(chunk_header[1]); + chunk_cnt = ntohl(chunk_header[2]); + tot_chunk_cnt = ntohl(chunk_header[3]); + chunk_body_len = ntohl(chunk_header[4]); + + /* Read the full chunk body, expected to be present in its entirety. */ + rv = apr_file_read_full(f_stdin, buf, (apr_size_t)chunk_body_len, nRead); + if (APR_STATUS_IS_EOF(rv)) { + return rv; + } + else if (rv != APR_SUCCESS) { + exit(3); + } + + if (chunk_cnt != 1 || tot_chunk_cnt != 1) { + /* + * If this is a multi-chunk log record, stage the chunks so they can be + * reassembled in order. + */ + uint32_t chunk_idx = chunk_cnt - 1; + char *chunk_body; + char **chunk_elem; + char *slot_key; + apr_array_header_t *chunks_head; + + slot_key = apr_psprintf(p, "%i.%i", proc_slot, thd_slot); + chunks_head = apr_hash_get(thread_chunks, slot_key, + APR_HASH_KEY_STRING); + /* A thread must write chunks of a message in order. */ + if (chunks_head == NULL) { + if (chunk_idx > 0) { + /* + * Expected to have a table entry here since we're in + * mid-sequence for this chunk set. + */ + fprintf(stderr, "Chunks found in a non-ordered sequence, " + "skipping this chunk set\n"); + return APR_SUCCESS; + } + chunks_head = apr_array_make(p, 10, sizeof(char *)); + apr_hash_set(thread_chunks, slot_key, APR_HASH_KEY_STRING, + chunks_head); + } + else { + /* Check that the chunk list size jives with chunk_cnt. */ + if (chunk_idx != chunks_head->nelts) { + fprintf(stderr, "Chunks found in a non-ordered sequence, " + "skipping this chunk set\n"); + apr_array_clear(chunks_head); + return APR_SUCCESS; + } + } + + chunk_elem = apr_array_push(chunks_head); + + /* Stage this chunk. */ + chunk_body = malloc(chunk_body_len); + memcpy(chunk_body, buf, chunk_body_len); + *chunk_elem = chunk_body; + + if (chunk_cnt == tot_chunk_cnt) { + /* + * If this is the final chunk, glue them together so the original + * message can be written below. + */ + uint32_t chunk_len; + uint32_t chunk_iter; + uint32_t cpy_len = 0; + + *msg = malloc(LOG_BUFSIZE_LESS_CHUNKHEAD * tot_chunk_cnt); + for (chunk_iter = 0; chunk_iter < chunks_head->nelts; + ++chunk_iter) { + char *chunk = ((char**)chunks_head->elts)[chunk_iter]; + chunk_len = LOG_BUFSIZE_LESS_CHUNKHEAD; + if (chunk_iter + 1 == tot_chunk_cnt) + chunk_len = chunk_body_len; + memcpy(*msg + cpy_len, chunk, chunk_len); + cpy_len += chunk_len; + free(chunk); + } + *nRead = cpy_len; + apr_array_clear(chunks_head); + } + else { + /* Otherwise continue staging and writing single-chunk messages. */ + return APR_SUCCESS; + } + } + else { + /* + * This log message is transferred in a single chunk, simply return and + * write it. + */ + *msg = buf; + } + return APR_SUCCESS; +} + int main (int argc, const char * const argv[]) { char buf[BUFSIZE]; @@ -565,8 +687,12 @@ int main (int argc, const char * const argv[]) apr_getopt_t *opt; apr_status_t rv; char c; + char *env; const char *opt_arg; const char *err = NULL; + int chunked_msgs; + apr_hash_t *thread_chunks; + char *msg; #if APR_FILES_AS_SOCKETS apr_pollfd_t pollfd = { 0 }; apr_status_t pollret = APR_SUCCESS; @@ -693,6 +819,11 @@ int main (int argc, const char * const argv[]) } #endif + rv = apr_env_get(&env, "AP_MOD_LOG_CONFIG_CHUNKED_MSG", status.pool); + chunked_msgs = (rv == APR_SUCCESS && env != NULL); + + thread_chunks = apr_hash_make(status.pool); + /* * Immediately open the logfile as we start, if we were forced * to do so via '-f'. @@ -702,7 +833,10 @@ int main (int argc, const char * const argv[]) } for (;;) { + int freemsg = 0; + int skip_write = 0; nRead = sizeof(buf); + msg = NULL; #if APR_FILES_AS_SOCKETS if (config.create_empty && config.tRotation) { polltimeout = status.tLogEnd ? status.tLogEnd - get_now(&config, NULL) : config.tRotation; @@ -714,16 +848,28 @@ int main (int argc, const char * const argv[]) } } if (pollret == APR_SUCCESS) { - rv = apr_file_read(f_stdin, buf, &nRead); + if (chunked_msgs) { + rv = read_chunk(f_stdin, status.pool, buf, &msg, + thread_chunks, &nRead); + } + else { + rv = apr_file_read(f_stdin, buf, &nRead); + msg = buf; + } if (APR_STATUS_IS_EOF(rv)) { break; } else if (rv != APR_SUCCESS) { exit(3); } + if (msg == NULL) + skip_write = 1; + else if (msg != buf) + freemsg = 1; } else if (pollret == APR_TIMEUP) { *buf = 0; + msg = buf; nRead = 0; } else { @@ -731,21 +877,35 @@ int main (int argc, const char * const argv[]) exit(5); } #else /* APR_FILES_AS_SOCKETS */ - rv = apr_file_read(f_stdin, buf, &nRead); + if (chunked_msgs) { + rv = read_chunk(f_stdin, status.pool, buf, &msg, + thread_chunks, &nRead); + } + else { + rv = apr_file_read(f_stdin, buf, &nRead); + msg = buf; + } if (APR_STATUS_IS_EOF(rv)) { break; } else if (rv != APR_SUCCESS) { exit(3); } + if (msg == NULL) + skip_write = 1; + else if (msg != buf) + freemsg = 1; #endif /* APR_FILES_AS_SOCKETS */ checkRotate(&config, &status); if (status.rotateReason != ROTATE_NONE) { doRotate(&config, &status); } + if (skip_write) + continue; + nWrite = nRead; - rv = apr_file_write_full(status.current.fd, buf, nWrite, &nWrite); + rv = apr_file_write_full(status.current.fd, msg, nWrite, &nWrite); if (nWrite != nRead) { apr_off_t cur_offset; apr_pool_t *pool; @@ -768,11 +928,13 @@ int main (int argc, const char * const argv[]) status.nMessCount++; } if (config.echo) { - if (apr_file_write_full(f_stdout, buf, nRead, NULL)) { + if (apr_file_write_full(f_stdout, msg, nRead, NULL)) { fprintf(stderr, "Unable to write to stdout\n"); exit(4); } } + if (freemsg) + free(msg); } return 0; /* reached only at stdin EOF. */