Hello, I work at SmarctJog.com, we have here some patches on IceCast for performance and reliability, these are mostly client/connection/source cleanups (a slave merge is underway, and some more good stuff (c)), but we'd like this to be merged in before the list gets any longer. Please find attached a list of our patches with a short desc: This one is actually not from us/me, it was found on a forum long time ago, and I re-worked it a bit, sorry for the name stealing, author please manifest. [PATCH 01/31] Hack to support IPhone streaming [Thread connection] [PATCH 02/31] introduce thread_cond_init and refrase thread_cond_create. [Get rid of the {WARN,INFO,DEBUG,..}n macros] [PATCH 03/31] LOGGING add non arg-counted macros. [simplify header parsing, inspired by source.c code] [PATCH 04/31] UTIL, add find_eos_delim and use it to simplify util_read_header (pending con) [PATCH 05/31] connection: make process_request_queue use util_find_eos_delim [PATCH 06/31] connection.c: use util_find_eos_delim to simplify _handle_shoutcast_compatible [cleanup shoutcast code] [PATCH 07/31] Connection: make _handle_shoutcast_compatible more readable, [cleanup generic connection code] [PATCH 08/31] connection: _handle_connection re-roll logic in a more readeable way. [PATCH 09/31] connection: simplify _handle_connection (make it more readeable) [PATCH 10/31] Connection: _accept_connection, simplify logic [PATCH 11/31] Connection: connection_accept_loop, pass timeout [big refactoring] [PATCH 12/31] Connection: refactor source_startup [PATCH 13/31] Connection: let everything go through fserve [add post support] [PATCH 14/31] Add POST support. WARNING, still no AUTH [re-work connection process, to a more readable and simple way] [PATCH 15/31] Connection: simplify in-connection handeling (kill 1 function, and break shoutcast) [PATCH 16/31] Connection: extract connection_process [PATCH 17/31] Connection: add threads this needs to go after the client_tag_t obsoletting patch [PATCH 18/31] connection_process takes node, con_q_t gets refbuf, and con_t timeout, util updated [PATCH 19/31] connection: process takes node, not con+args, cleanup error handeling (propagate) [PATCH 20/31] connection: add parser to connection, and use read_headers changes. [PATCH 21/31] Add Client in connection_queue_t [PATCH 22/31] Fix Shoutcast, Move it to a one-pass process [tweaks] [PATCH 23/31] connection: duration should be bigger [PATCH 24/31] connection: reorder logic in connection_setup_sockets [PATCH 25/31] client_send_400, print 400 message [PATCH 26/31] Connection: handle_client returns err to client (via client_send_400) [PATCH 27/31] connection: more coments for remy [PATCH 28/31] Big Comments cleanup [PATCH 29/31] connection: client_setup, send 403 when we can for error [PATCH 30/31] connection: _close set everything to NULL on the way out [this might be a bug] [PATCH 31/31] source: make get_next_buffer try only 10 times and then bail out
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 01/31] Hack to support IPhone streaming
Iphone (CoreMedia) requires that we insert a Range header, create a fake one on the fly ! Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/format.c | 26 ++++++++++++++++++------ src/format_mp3.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/format.c b/src/format.c index 415391c..369dfbd 100644 --- a/src/format.c +++ b/src/format.c @@ -3,7 +3,7 @@ * This program is distributed under the GNU General Public License, version 2. * A copy of this license is included with this source. * - * Copyright 2000-2004, Jack Moffitt <jack at xiph.org, + * Copyright 2000-2004, Jack Moffitt <jack at xiph.org, * Michael Smith <msmith at xiph.org>, * oddsock <oddsock at xiph.org>, * Karl Heyes <karl at xiph.org> @@ -85,7 +85,7 @@ int format_get_plugin(format_type_t type, source_t *source) break; } if (ret < 0) - stats_event (source->mount, "content-type", + stats_event (source->mount, "content-type", source->format->contenttype); return ret; @@ -93,7 +93,7 @@ int format_get_plugin(format_type_t type, source_t *source) /* clients need to be start from somewhere in the queue so we will look for - * a refbuf which has been previously marked as a sync point. + * a refbuf which has been previously marked as a sync point. */ static void find_client_start (source_t *source, client_t *client) { @@ -248,7 +248,7 @@ int format_generic_write_to_client (client_t *client) /* This is the commonly used for source streams, here we just progress to - * the next buffer in the queue if there is no more left to be written from + * the next buffer in the queue if there is no more left to be written from * the existing buffer. */ int format_advance_queue (source_t *source, client_t *client) @@ -280,12 +280,24 @@ static int format_prepare_headers (source_t *source, client_t *client) avl_node *node; ice_config_t *config; + /* Partial hack, check for range and user agent in this function */ + const char *useragent; + const char *range; + useragent = httpp_getvar (client->parser, "user-agent"); + range = httpp_getvar (client->parser, "range"); + remaining = client->refbuf->len; ptr = client->refbuf->data; client->respcode = 200; - bytes = snprintf (ptr, remaining, "HTTP/1.0 200 OK\r\n" - "Content-Type: %s\r\n", source->format->contenttype); + + if (range && useragent && strstr(useragent, "CoreMedia")) { + bytes = snprintf (ptr, remaining, "HTTP/1.1 206 Partial Content\r\n" + "Content-Type: %s\r\n", source->format->contenttype); + } else { + bytes = snprintf (ptr, remaining, "HTTP/1.0 200 OK\r\n" + "Content-Type: %s\r\n", source->format->contenttype); + } remaining -= bytes; ptr += bytes; @@ -307,7 +319,7 @@ static int format_prepare_headers (source_t *source, client_t *client) if (bitrate_filtered == 0) brfield = strstr(var->value, "bitrate="); if (brfield && sscanf (brfield, "bitrate=%u", &bitrate)) - { + { bytes = snprintf (ptr, remaining, "icy-br:%u\r\n", bitrate); next = 0; bitrate_filtered = 1; diff --git a/src/format_mp3.c b/src/format_mp3.c index 7f08e15..2d62314 100644 --- a/src/format_mp3.c +++ b/src/format_mp3.c @@ -4,7 +4,7 @@ * This program is distributed under the GNU General Public License, version 2. * A copy of this license is included with this source. * - * Copyright 2000-2004, Jack Moffitt <jack at xiph.org, + * Copyright 2000-2004, Jack Moffitt <jack at xiph.org, * Michael Smith <msmith at xiph.org>, * oddsock <oddsock at xiph.org>, * Karl Heyes <karl at xiph.org> @@ -461,7 +461,7 @@ static void format_mp3_free_plugin(format_plugin_t *self) /* This does the actual reading, making sure the read data is packaged in * blocks of 1400 bytes (near the common MTU size). This is because many - * incoming streams come in small packets which could waste a lot of + * incoming streams come in small packets which could waste a lot of * bandwidth with many listeners due to headers and such like. */ static int complete_read (source_t *source) @@ -476,7 +476,7 @@ static int complete_read (source_t *source) if (source_mp3->read_data == NULL) { - source_mp3->read_data = refbuf_new (REFBUF_SIZE); + source_mp3->read_data = refbuf_new (REFBUF_SIZE); source_mp3->read_count = 0; } buf = source_mp3->read_data->data + source_mp3->read_count; @@ -603,7 +603,7 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) source_mp3->build_metadata_offset += bytes; break; } - /* copy all bytes except the last one, that way we + /* copy all bytes except the last one, that way we * know a null byte terminates the message */ memcpy (source_mp3->build_metadata + source_mp3->build_metadata_offset, src, metadata_remaining-1); @@ -654,6 +654,42 @@ static refbuf_t *mp3_get_filter_meta (source_t *source) return refbuf; } +inline int format_mp3_insert_coremedia_header(client_t *client, char *ptr, int remaining) { + int range[2]; + char *rangestr; + int read = 0; + int bytes; + rangestr = httpp_getvar(client->parser, "range"); + + if (rangestr != NULL) { + if (sscanf(rangestr, "bytes=%d-%d", &range[0], &range[1]) != 2) + return 0; + if (range[0] < 0) + return 0; + + char currenttime[50]; + time_t now; + int strflen; + struct tm result; + time(&now); + strflen = strftime(currenttime, 50, "%a, %d-%b-%Y %X GMT", + gmtime_r(&now, &result)); + client->respcode = 206; + bytes = snprintf(ptr, remaining, "Date: %s\r\n", currenttime); + if (bytes > 0){ + remaining -= bytes; + read += bytes; + } + bytes = snprintf(ptr + read, remaining, "Content-Range: bytes %d-%d/221183499\r\n", + range[0], range[1]); + if (bytes > 0) { + remaining -= bytes; + read += bytes; + } + } + + return read; +} static int format_mp3_create_client_data(source_t *source, client_t *client) { @@ -681,7 +717,17 @@ static int format_mp3_create_client_data(source_t *source, client_t *client) /* avoid browser caching, reported via forum */ bytes = snprintf (ptr, remaining, "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"); remaining -= bytes; - ptr += bytes; + ptr += bytes; + } + + /* hack for iPhone OS or Simulator with CoreMedia. checks user agent then adds iPhone-specific headers */ + if (useragent && strstr(useragent, "CoreMedia")) + { + bytes = format_mp3_insert_coremedia_header(client, ptr, remaining); + if (bytes) { + ptr += bytes; + remaining -= bytes; + } } client->format_data = client_mp3; -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 02/31] introduce thread_cond_init and refrase thread_cond_create.
thread_cond_create doesn't actually create a new thread cond (it calls pthread_cond_init). Make it actually alloc a new cond structure (that will have to be freed with free()), and add thread_cond_init retaining old behaviour. Implement change in connection.c Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 2 +- src/thread/thread.c | 13 ++++++++++++- src/thread/thread.h | 5 +++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/connection.c b/src/connection.c index 5cc922a..97fe7e9 100644 --- a/src/connection.c +++ b/src/connection.c @@ -144,7 +144,7 @@ void connection_initialize(void) thread_spin_create (&_connection_lock); thread_mutex_create(&move_clients_mutex); thread_rwlock_create(&_source_shutdown_rwlock); - thread_cond_create(&global.shutdown_cond); + thread_cond_init(&global.shutdown_cond); _req_queue = NULL; _req_queue_tail = &_req_queue; _con_queue = NULL; diff --git a/src/thread/thread.c b/src/thread/thread.c index deeb695..e5e0ac6 100644 --- a/src/thread/thread.c +++ b/src/thread/thread.c @@ -493,12 +493,23 @@ void thread_mutex_unlock_c(mutex_t *mutex, int line, char *file) #endif /* DEBUG_MUTEXES */ } -void thread_cond_create_c(cond_t *cond, int line, char *file) +void thread_cond_init(cond_t *cond) { pthread_cond_init(&cond->sys_cond, NULL); pthread_mutex_init(&cond->cond_mutex, NULL); } +cond_t *thread_cond_create(void) +{ + cond_t *cond = calloc (1, sizeof(cond_t)); + if (!cond) + return; + + thread_cond_init(cond); + + return cond; +} + void thread_cond_destroy(cond_t *cond) { pthread_mutex_destroy(&cond->cond_mutex); diff --git a/src/thread/thread.h b/src/thread/thread.h index fe4e4a0..1484ff7 100644 --- a/src/thread/thread.h +++ b/src/thread/thread.h @@ -111,7 +111,7 @@ typedef mutex_t spin_t; #define thread_mutex_create(x) thread_mutex_create_c(x,__LINE__,__FILE__) #define thread_mutex_lock(x) thread_mutex_lock_c(x,__LINE__,__FILE__) #define thread_mutex_unlock(x) thread_mutex_unlock_c(x,__LINE__,__FILE__) -#define thread_cond_create(x) thread_cond_create_c(x,__LINE__,__FILE__) +//#define thread_cond_create() thread_cond_create_c(__LINE__,__FILE__) #define thread_cond_signal(x) thread_cond_signal_c(x,__LINE__,__FILE__) #define thread_cond_broadcast(x) thread_cond_broadcast_c(x,__LINE__,__FILE__) #define thread_cond_wait(x) thread_cond_wait_c(x,__LINE__,__FILE__) @@ -169,7 +169,8 @@ void thread_mutex_create_c(mutex_t *mutex, int line, char *file); void thread_mutex_lock_c(mutex_t *mutex, int line, char *file); void thread_mutex_unlock_c(mutex_t *mutex, int line, char *file); void thread_mutex_destroy(mutex_t *mutex); -void thread_cond_create_c(cond_t *cond, int line, char *file); +void thread_cond_init(cond_t *cond); +cond_t *thread_cond_create(void); void thread_cond_signal_c(cond_t *cond, int line, char *file); void thread_cond_broadcast_c(cond_t *cond, int line, char *file); void thread_cond_wait_c(cond_t *cond, int line, char *file); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 03/31] LOGGING add non arg-counted macros.
Today everybody support varargs? Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/logging.h | 8 ++++++++ 1 files changed, 8 insertions(+), 0 deletions(-) diff --git a/src/logging.h b/src/logging.h index e04cb1b..2d6ee71 100644 --- a/src/logging.h +++ b/src/logging.h @@ -34,22 +34,30 @@ extern int playlistlog; #define __func__ strrchr (__FILE__, '\\') ? strrchr (__FILE__, '\\') + 1 : __FILE__ #endif +#define ERROR(args...) log_write(errorlog, 1, CATMODULE "/", __func__, args) + #define ERROR0(y) log_write(errorlog, 1, CATMODULE "/", __func__, y) #define ERROR1(y, a) log_write(errorlog, 1, CATMODULE "/", __func__, y, a) #define ERROR2(y, a, b) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b) #define ERROR3(y, a, b, c) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b, c) #define ERROR4(y, a, b, c, d) log_write(errorlog, 1, CATMODULE "/", __func__, y, a, b, c, d) +#define WARN(args...) log_write(errorlog, 2, CATMODULE "/", __func__, args) + #define WARN0(y) log_write(errorlog, 2, CATMODULE "/", __func__, y) #define WARN1(y, a) log_write(errorlog, 2, CATMODULE "/", __func__, y, a) #define WARN2(y, a, b) log_write(errorlog, 2, CATMODULE "/", __func__, y, a, b) #define WARN3(y, a, b, c) log_write(errorlog, 2, CATMODULE "/", __func__, y, a, b, c) +#define INFO(args...) log_write(errorlog, 3, CATMODULE "/", __func__, args) + #define INFO0(y) log_write(errorlog, 3, CATMODULE "/", __func__, y) #define INFO1(y, a) log_write(errorlog, 3, CATMODULE "/", __func__, y, a) #define INFO2(y, a, b) log_write(errorlog, 3, CATMODULE "/", __func__, y, a, b) #define INFO3(y, a, b, c) log_write(errorlog, 3, CATMODULE "/", __func__, y, a, b, c) +#define DEBUG(args...) log_write(errorlog, 4, CATMODULE "/", __func__, args) + #define DEBUG0(y) log_write(errorlog, 4, CATMODULE "/", __func__, y) #define DEBUG1(y, a) log_write(errorlog, 4, CATMODULE "/", __func__, y, a) #define DEBUG2(y, a, b) log_write(errorlog, 4, CATMODULE "/", __func__, y, a, b) -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 04/31] UTIL, add find_eos_delim and use it to simplify util_read_header (pending con)
Use memmem so we don't need '\0' ended buffer, we know most sizes anyway. We now get a refbuf for free, so we attach it to the client already. Util: use flags instead of the partial/entire mess, code looks cleaner that way we'll need to port back the connection changes. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/slave.c | 23 +++++++-- src/util.c | 142 +++++++++++++++++++++++++++++++++++++++++++++-------------- src/util.h | 7 ++- 3 files changed, 130 insertions(+), 42 deletions(-) diff --git a/src/slave.c b/src/slave.c index 80dedca..834c1bb 100644 --- a/src/slave.c +++ b/src/slave.c @@ -155,11 +155,11 @@ static client_t *open_relay_connection (relay_server *relay) ice_config_t *config; http_parser_t *parser = NULL; connection_t *con=NULL; + refbuf_t *header = NULL; char *server = strdup (relay->server); char *mount = strdup (relay->mount); int port = relay->port; char *auth_header; - char header[4096]; config = config_get_config (); server_id = strdup (config->server_id); @@ -187,6 +187,7 @@ static client_t *open_relay_connection (relay_server *relay) while (redirects < 10) { sock_t streamsock; + int hdrsize; INFO2 ("connecting to %s:%d", server, port); @@ -197,6 +198,8 @@ static client_t *open_relay_connection (relay_server *relay) break; } con = connection_create (streamsock, -1, strdup (server)); + if (!con) + break; /* At this point we may not know if we are relaying an mp3 or vorbis * stream, but only send the icy-metadata header if the relay details @@ -214,15 +217,17 @@ static client_t *open_relay_connection (relay_server *relay) server, relay->mp3metadata?"Icy-MetaData: 1\r\n":"", auth_header); - memset (header, 0, sizeof(header)); - if (util_read_header (con->sock, header, 4096, READ_ENTIRE_HEADER) == 0) + + header = refbuf_new (PER_CLIENT_REFBUF_SIZE); + hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); + if (hdrsize == -ENOENT) { ERROR4 ("Header read failed for %s (%s:%d%s)", relay->localmount, server, port, mount); break; } parser = httpp_create_parser(); httpp_initialize (parser, NULL); - if (! httpp_parse_response (parser, header, strlen(header), relay->localmount)) + if (! httpp_parse_response (parser, header->data, hdrsize, relay->localmount)) { ERROR4("Error parsing relay request for %s (%s:%d%s)", relay->localmount, server, port, mount); @@ -280,7 +285,13 @@ static client_t *open_relay_connection (relay_server *relay) } global_unlock (); sock_set_blocking (streamsock, 0); - client_set_queue (client, NULL); + + header->len -= hdrsize; + memmove(header->data, header->data + hdrsize, header->len); + client_set_queue (client, header); + refbuf_release(header); + + client->pos = hdrsize; free (server); free (mount); free (server_id); @@ -297,6 +308,8 @@ static client_t *open_relay_connection (relay_server *relay) free (auth_header); if (con) connection_close (con); + if (header) + refbuf_release (header); if (parser) httpp_destroy (parser); return NULL; diff --git a/src/util.c b/src/util.c index 2894701..94b06f6 100644 --- a/src/util.c +++ b/src/util.c @@ -84,51 +84,125 @@ int util_timed_wait_for_fd(sock_t fd, int timeout) #endif } -int util_read_header(sock_t sock, char *buff, unsigned long len, int entire) +int util_find_eos_delim(refbuf_t *refbuf, int offset, int flags) { - int read_bytes, ret; - unsigned long pos; - char c; + int len = refbuf->len; + + if (offset < 0) { + len = -offset; + offset = 0; + } + + /* handle \n, \r\n and nsvcap which for some strange reason has + * EOL as \r\r\n */ + char *ptr; + switch (flags) { + case HEADER_READ_LINE: + /* password line */ + ptr = memmem (refbuf->data + offset, len - offset, "\r\r\n", 3); + if (ptr) + return ((ptr+3) - refbuf->data + offset); + ptr = memmem (refbuf->data + offset, len - offset, "\r\n", 2); + if (ptr) + return ((ptr+2) - refbuf->data + offset); + ptr = memmem (refbuf->data + offset, len - offset, "\n", 1); + if (ptr) + return ((ptr+1) - refbuf->data + offset); + break; + case HEADER_READ_ENTIRE: + /* stream_offset refers to the start of any data sent after the + * http style headers, we don't want to lose those */ + ptr = memmem (refbuf->data + offset, len - offset, "\r\r\n\r\r\n", 6); + if (ptr) + return ((ptr+6) - refbuf->data + offset); + + ptr = memmem (refbuf->data + offset, len - offset, "\r\n\r\n", 4); + if (ptr) + return ((ptr+4) - refbuf->data + offset); + + ptr = memmem (refbuf->data + offset, len - offset, "\n\n", 2); + if (ptr) + return ((ptr+2) - refbuf->data + offset); + break; + default: + WARN ("Unhandled flag: %d", flags); + } + + return -ENOENT; +} + +int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) +{ + int bytes, pos, endpos = -ENOENT; ice_config_t *config; int header_timeout; + if (!refbuf) { + WARN ("No refbuf !"); + return -ENOENT; + } + config = config_get_config(); - header_timeout = config->header_timeout; + header_timeout = config->header_timeout*1000; config_release_config(); - read_bytes = 1; - pos = 0; - ret = 0; - - while ((read_bytes == 1) && (pos < (len - 1))) { - read_bytes = 0; + if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) { + INFO("util_timed_wait_for_fd <= 0"); + return -EAGAIN; + } - if (util_timed_wait_for_fd(sock, header_timeout*1000) > 0) { + if (refbuf->sync_point < 0) { + DEBUG ("REENTRING, got and old non-resolved sync"); + pos = -refbuf->sync_point; + } else if (refbuf->sync_point > 0) { + DEBUG ("REENTRING, got and old resolved sync"); + endpos = pos = refbuf->sync_point; + } else { + DEBUG ("FIRST TIME, no sync"); + pos = 0; + } - if ((read_bytes = recv(sock, &c, 1, 0))) { - if (c != '\r') buff[pos++] = c; - if (entire) { - if ((pos > 1) && (buff[pos - 1] == '\n' && - buff[pos - 2] == '\n')) { - ret = 1; - break; - } - } - else { - if ((pos > 1) && (buff[pos - 1] == '\n')) { - ret = 1; - break; - } - } + while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) >= 0) { + if (bytes == 0) + con->error = 1; + if (bytes == -1 && !sock_recoverable (sock_error())) + con->error = 1; + + DEBUG("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos); + /* this is used for re-entrance, so we get a new chance to read */ + if (endpos == -ENOENT) + endpos = util_find_eos_delim (refbuf, -(bytes + pos), flags); + if (endpos != -ENOENT) { + INFO("found it, read %d, left for you: %d, starting %s", + pos + bytes, pos + bytes - endpos, refbuf->data); + if (pos + bytes - endpos > 0) { + refbuf->len = pos + bytes; + INFO("ok got everything"); + refbuf->sync_point = 0; + return endpos; } - } else { - break; + INFO ("missing client data, come back for more"); + refbuf->sync_point = endpos; + return endpos; } - } - if (ret) buff[pos] = '\0'; + pos += bytes; - return ret; + if (refbuf->len - pos <= 0) { + WARN ("Looked for endpos up to %d, but couldn't find it,??? well data is %s", pos, refbuf->data); + return -ENOMEM; + } + + if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) { + INFO ("util_timed_wait_for_fd <= 0"); + refbuf->sync_point = -pos; + return -EAGAIN; + } + } + + WARN("Couldn't find enough data, pos = %d, data = %s, entire = %d.\n", pos, refbuf->data, flags); + refbuf->sync_point = 0; + return -ENOENT; } char *util_get_extension(const char *path) { diff --git a/src/util.h b/src/util.h index ac44f89..7ce8ed6 100644 --- a/src/util.h +++ b/src/util.h @@ -16,13 +16,14 @@ #define XSLT_CONTENT 1 #define HTML_CONTENT 2 -#define READ_ENTIRE_HEADER 1 -#define READ_LINE 0 +#define HEADER_READ_ENTIRE 0 +#define HEADER_READ_LINE 1 #define MAX_LINE_LEN 512 int util_timed_wait_for_fd(sock_t fd, int timeout); -int util_read_header(sock_t sock, char *buff, unsigned long len, int entire); +int util_find_eos_delim(refbuf_t *refbuf, int offset, int flags); +int util_read_header(connection_t *con, refbuf_t *refbuf, int flags); int util_check_valid_extension(const char *uri); char *util_get_extension(const char *path); char *util_get_path_from_uri(char *uri); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 05/31] connection: make process_request_queue use util_find_eos_delim
We use the negative offset hack here, make sure it's in tree Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 46 +++------------------------------------------- 1 files changed, 3 insertions(+), 43 deletions(-) diff --git a/src/connection.c b/src/connection.c index 97fe7e9..ae5056e 100644 --- a/src/connection.c +++ b/src/connection.c @@ -608,50 +608,10 @@ static void process_request_queue (void) if (len > 0) { - int pass_it = 1; - char *ptr; - - /* handle \n, \r\n and nsvcap which for some strange reason has - * EOL as \r\r\n */ node->offset += len; - client->refbuf->data [node->offset] = '\000'; - do - { - if (node->shoutcast == 1) - { - /* password line */ - if (strstr (client->refbuf->data, "\r\r\n") != NULL) - break; - if (strstr (client->refbuf->data, "\r\n") != NULL) - break; - if (strstr (client->refbuf->data, "\n") != NULL) - break; - } - /* stream_offset refers to the start of any data sent after the - * http style headers, we don't want to lose those */ - ptr = strstr (client->refbuf->data, "\r\r\n\r\r\n"); - if (ptr) - { - node->stream_offset = (ptr+6) - client->refbuf->data; - break; - } - ptr = strstr (client->refbuf->data, "\r\n\r\n"); - if (ptr) - { - node->stream_offset = (ptr+4) - client->refbuf->data; - break; - } - ptr = strstr (client->refbuf->data, "\n\n"); - if (ptr) - { - node->stream_offset = (ptr+2) - client->refbuf->data; - break; - } - pass_it = 0; - } while (0); - - if (pass_it) - { + if ((len = util_find_eos_delim(client->refbuf, -node->offset, + node->shoutcast?HEADER_READ_LINE:HEADER_READ_ENTIRE)) < 0) { + node->stream_offset = len; if ((client_queue_t **)_req_queue_tail == &(node->next)) _req_queue_tail = (volatile client_queue_t **)node_ref; *node_ref = node->next; -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 06/31] connection.c: use util_find_eos_delim to simplify _handle_shoutcast_compatible
this now allows us to get rid of the strcmp for a memmem and don't else into error. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 32 ++++++++------------------------ 1 files changed, 8 insertions(+), 24 deletions(-) diff --git a/src/connection.c b/src/connection.c index ae5056e..dac34a0 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1122,8 +1122,9 @@ static void _handle_shoutcast_compatible (client_queue_t *node) if (node->shoutcast == 1) { - char *source_password, *ptr, *headers; + char *source_password, *headers; mount_proxy *mountinfo = config_find_mount (config, shoutcast_mount); + int hdrlen; if (mountinfo && mountinfo->password) source_password = strdup (mountinfo->password); @@ -1131,35 +1132,18 @@ static void _handle_shoutcast_compatible (client_queue_t *node) source_password = strdup (config->source_password); config_release_config(); - /* Get rid of trailing \r\n or \n after password */ - ptr = strstr (client->refbuf->data, "\r\r\n"); - if (ptr) - headers = ptr+3; - else - { - ptr = strstr (client->refbuf->data, "\r\n"); - if (ptr) - headers = ptr+2; - else - { - ptr = strstr (client->refbuf->data, "\n"); - if (ptr) - headers = ptr+1; - } - } - if (ptr == NULL) - { + if ((hdrlen = util_find_eos_delim (client->refbuf, 0, HEADER_READ_LINE)) < 0) { client_destroy (client); free (source_password); free (node->shoutcast_mount); free (node); return; } - *ptr = '\0'; - if (strcmp (client->refbuf->data, source_password) == 0) - { + headers = client->refbuf->data + hdrlen; + + if (memmem (client->refbuf->data, hdrlen, source_password, strlen(source_password)) != NULL) { client->respcode = 200; /* send this non-blocking but if there is only a partial write * then leave to header timeout */ @@ -1172,8 +1156,8 @@ static void _handle_shoutcast_compatible (client_queue_t *node) free (source_password); return; } - else - INFO1 ("password does not match \"%s\"", client->refbuf->data); + + INFO1 ("password does not match \"%s\"", client->refbuf->data); client_destroy (client); free (source_password); free (node->shoutcast_mount); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 07/31] Connection: make _handle_shoutcast_compatible more readable,
drop headers refbuf that pointed to the end of the headers. introduce a refbuf = client->refbuf, this cleansup a quite a lot. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 22 ++++++++++------------ 1 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/connection.c b/src/connection.c index dac34a0..e558cd0 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1114,6 +1114,7 @@ static void _handle_shoutcast_compatible (client_queue_t *node) ice_config_t *config = config_get_config (); char *shoutcast_mount; client_t *client = node->client; + refbuf_t *refbuf = client->refbuf; if (node->shoutcast_mount) shoutcast_mount = node->shoutcast_mount; @@ -1122,7 +1123,7 @@ static void _handle_shoutcast_compatible (client_queue_t *node) if (node->shoutcast == 1) { - char *source_password, *headers; + char *source_password; mount_proxy *mountinfo = config_find_mount (config, shoutcast_mount); int hdrlen; @@ -1132,7 +1133,6 @@ static void _handle_shoutcast_compatible (client_queue_t *node) source_password = strdup (config->source_password); config_release_config(); - if ((hdrlen = util_find_eos_delim (client->refbuf, 0, HEADER_READ_LINE)) < 0) { client_destroy (client); free (source_password); @@ -1141,15 +1141,13 @@ static void _handle_shoutcast_compatible (client_queue_t *node) return; } - headers = client->refbuf->data + hdrlen; - - if (memmem (client->refbuf->data, hdrlen, source_password, strlen(source_password)) != NULL) { + if (memmem (refbuf->data, hdrlen, source_password, strlen(source_password)) != NULL) { client->respcode = 200; /* send this non-blocking but if there is only a partial write * then leave to header timeout */ sock_write (client->con->sock, "OK2\r\nicy-caps:11\r\n\r\n"); - node->offset -= (headers - client->refbuf->data); - memmove (client->refbuf->data, headers, node->offset+1); + node->offset -= hdrlen; + memmove (refbuf->data, refbuf->data + hdrlen, node->offset+1); node->shoutcast = 2; /* we've checked the password, now send it back for reading headers */ _add_request_queue (node); @@ -1173,19 +1171,19 @@ static void _handle_shoutcast_compatible (client_queue_t *node) http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset; http_compliant = (char *)calloc(1, http_compliant_len); snprintf (http_compliant, http_compliant_len, - "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, client->refbuf->data); + "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, refbuf->data); parser = httpp_create_parser(); httpp_initialize(parser, NULL); if (httpp_parse (parser, http_compliant, strlen(http_compliant))) { /* we may have more than just headers, so prepare for it */ if (node->stream_offset == node->offset) - client->refbuf->len = 0; + refbuf->len = 0; else { - char *ptr = client->refbuf->data; - client->refbuf->len = node->offset - node->stream_offset; - memmove (ptr, ptr + node->stream_offset, client->refbuf->len); + char *ptr = refbuf->data; + refbuf->len = node->offset - node->stream_offset; + memmove (ptr, ptr + node->stream_offset, refbuf->len); } client->parser = parser; source_startup (client, shoutcast_mount, SHOUTCAST_SOURCE_AUTH); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 08/31] connection: _handle_connection re-roll logic in a more readeable way.
we have a lot of if (something) { do something .... } else break; change that to bail out as soon as possible, keeping indentation flatish. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 127 ++++++++++++++++++++++++++---------------------------- 1 files changed, 61 insertions(+), 66 deletions(-) diff --git a/src/connection.c b/src/connection.c index e558cd0..bb6cdc8 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1213,84 +1213,79 @@ static void _handle_connection(void) while (1) { node = _get_connection(); - if (node) - { - client_t *client = node->client; - - /* Check for special shoutcast compatability processing */ - if (node->shoutcast) - { - _handle_shoutcast_compatible (node); - continue; - } + if (! node) + break; - /* process normal HTTP headers */ - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - client->parser = parser; - if (httpp_parse (parser, client->refbuf->data, node->offset)) - { - char *uri; + client_t *client = node->client; + char *uri; - /* we may have more than just headers, so prepare for it */ - if (node->stream_offset == node->offset) - client->refbuf->len = 0; - else - { - char *ptr = client->refbuf->data; - client->refbuf->len = node->offset - node->stream_offset; - memmove (ptr, ptr + node->stream_offset, client->refbuf->len); - } + /* Check for special shoutcast compatability processing */ + if (node->shoutcast) + { + _handle_shoutcast_compatible (node); + continue; + } - rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + /* process normal HTTP headers */ + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + client->parser = parser; + if (!httpp_parse (parser, client->refbuf->data, node->offset)) + { + free (node); + ERROR0("HTTP request parsing failed"); + client_destroy (client); + continue; + } - /* assign a port-based shoutcast mountpoint if required */ - if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) - httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); + /* we may have more than just headers, so prepare for it */ + if (node->stream_offset == node->offset) { + client->refbuf->len = 0; + } else { + char *ptr = client->refbuf->data; + client->refbuf->len = node->offset - node->stream_offset; + memmove (ptr, ptr + node->stream_offset, client->refbuf->len); + } - free (node->shoutcast_mount); - free (node); + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); - if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && - strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { - ERROR0("Bad HTTP protocol detected"); - client_destroy (client); - continue; - } + /* assign a port-based shoutcast mountpoint if required */ + if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) + httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); - uri = util_normalise_uri(rawuri); + free (node->shoutcast_mount); + free (node); - if (uri == NULL) - { - client_destroy (client); - continue; - } + if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && + strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { + ERROR0("Bad HTTP protocol detected"); + client_destroy (client); + continue; + } - if (parser->req_type == httpp_req_source) { - _handle_source_request (client, uri); - } - else if (parser->req_type == httpp_req_stats) { - _handle_stats_request (client, uri); - } - else if (parser->req_type == httpp_req_get) { - _handle_get_request (client, uri); - } - else { - ERROR0("Wrong request type from client"); - client_send_400 (client, "unknown request"); - } + uri = util_normalise_uri(rawuri); - free(uri); - } - else - { - free (node); - ERROR0("HTTP request parsing failed"); - client_destroy (client); - } + if (uri == NULL) + { + client_destroy (client); continue; } - break; + + if (parser->req_type == httpp_req_source) { + _handle_source_request (client, uri); + } + else if (parser->req_type == httpp_req_stats) { + _handle_stats_request (client, uri); + } + else if (parser->req_type == httpp_req_get) { + _handle_get_request (client, uri); + } + else { + ERROR0("Wrong request type from client"); + client_send_400 (client, "unknown request"); + } + + free(uri); } } -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 09/31] connection: simplify _handle_connection (make it more readeable)
client->refbuf -> refbuf cleanups Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 12 +++++++----- 1 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/connection.c b/src/connection.c index bb6cdc8..82112b2 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1217,8 +1217,10 @@ static void _handle_connection(void) break; client_t *client = node->client; + refbuf_t *refbuf = client->refbuf; char *uri; + /* Check for special shoutcast compatability processing */ if (node->shoutcast) { @@ -1230,7 +1232,7 @@ static void _handle_connection(void) parser = httpp_create_parser(); httpp_initialize(parser, NULL); client->parser = parser; - if (!httpp_parse (parser, client->refbuf->data, node->offset)) + if (!httpp_parse (parser, refbuf->data, node->offset)) { free (node); ERROR0("HTTP request parsing failed"); @@ -1240,11 +1242,11 @@ static void _handle_connection(void) /* we may have more than just headers, so prepare for it */ if (node->stream_offset == node->offset) { - client->refbuf->len = 0; + refbuf->len = 0; } else { - char *ptr = client->refbuf->data; - client->refbuf->len = node->offset - node->stream_offset; - memmove (ptr, ptr + node->stream_offset, client->refbuf->len); + char *ptr = refbuf->data; + refbuf->len = node->offset - node->stream_offset; + memmove (ptr, ptr + node->stream_offset, refbuf->len); } rawuri = httpp_getvar(parser, HTTPP_VAR_URI); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 10/31] Connection: _accept_connection, simplify logic
Execute test only when needed. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 7 ++++--- 1 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/connection.c b/src/connection.c index 82112b2..dcc5b39 100644 --- a/src/connection.c +++ b/src/connection.c @@ -533,10 +533,11 @@ static connection_t *_accept_connection(int duration) if (strncmp (ip, "::ffff:", 7) == 0) memmove (ip, ip+7, strlen (ip+7)+1); - if (accept_ip_address (ip)) + if (accept_ip_address (ip)) { con = connection_create (sock, serversock, ip); - if (con) - return con; + if (con) + return con; + } sock_close (sock); } else -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 11/31] Connection: connection_accept_loop, pass timeout
this saves one call config & locking Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 9 ++++----- 1 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/connection.c b/src/connection.c index dcc5b39..2866ade 100644 --- a/src/connection.c +++ b/src/connection.c @@ -585,12 +585,9 @@ static client_queue_t *_get_connection(void) /* run along queue checking for any data that has come in or a timeout */ -static void process_request_queue (void) +static void process_request_queue (int timeout) { client_queue_t **node_ref = (client_queue_t **)&_req_queue; - ice_config_t *config = config_get_config (); - int timeout = config->header_timeout; - config_release_config(); while (*node_ref) { @@ -654,9 +651,11 @@ void connection_accept_loop (void) connection_t *con; ice_config_t *config; int duration = 300; + int timeout = 0; config = config_get_config (); get_ssl_certificate (config); + timeout = config->header_timeout; config_release_config (); while (global.running == ICE_RUNNING) @@ -724,7 +723,7 @@ void connection_accept_loop (void) if (_req_queue == NULL) duration = 300; /* use longer timeouts when nothing waiting */ } - process_request_queue (); + process_request_queue (timeout); } /* Give all the other threads notification to shut down */ -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 12/31] Connection: refactor source_startup
Unroll logic, if (!blah) { bail } do something else is much nicer than if (blah) { do something else; } else { bail } Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 67 +++++++++++++++++++++++++++-------------------------- 1 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/connection.c b/src/connection.c index 2866ade..4dfbad7 100644 --- a/src/connection.c +++ b/src/connection.c @@ -993,45 +993,46 @@ static void _handle_source_request (client_t *client, const char *uri) void source_startup (client_t *client, const char *uri, int auth_style) { source_t *source; + refbuf_t *ok; source = source_reserve (uri); - if (source) - { - source->client = client; - source->parser = client->parser; - source->con = client->con; - if (connection_complete_source (source, 1) < 0) - { - source_clear_source (source); - source_free_source (source); - return; - } - client->respcode = 200; - if (auth_style == SHOUTCAST_SOURCE_AUTH) - { - source->shoutcast_compat = 1; - source_client_callback (client, source); - } - else - { - refbuf_t *ok = refbuf_new (PER_CLIENT_REFBUF_SIZE); - client->respcode = 200; - snprintf (ok->data, PER_CLIENT_REFBUF_SIZE, - "HTTP/1.0 200 OK\r\n\r\n"); - ok->len = strlen (ok->data); - /* we may have unprocessed data read in, so don't overwrite it */ - ok->associated = client->refbuf; - client->refbuf = ok; - fserve_add_client_callback (client, source_client_callback, source); - } - } - else - { + if (!source) { client_send_403 (client, "Mountpoint in use"); WARN1 ("Mountpoint %s in use", uri); + return; } -} + source->client = client; + source->parser = client->parser; + source->con = client->con; + if (connection_complete_source (source, 1) < 0) { + source_clear_source (source); + source_free_source (source); + return; + } + client->respcode = 200; + switch (auth_style) { + case SHOUTCAST_SOURCE_AUTH: + source->shoutcast_compat = 1; + case NOAUTH_SOURCE_AUTH: + source_client_callback (client, source); + break; + case ICECAST_SOURCE_AUTH: + ok = refbuf_new (PER_CLIENT_REFBUF_SIZE); + client->respcode = 200; + snprintf (ok->data, PER_CLIENT_REFBUF_SIZE, + "HTTP/1.0 200 OK\r\n\r\n"); + ok->len = strlen (ok->data); + /* we may have unprocessed data read in, so don't overwrite it */ + ok->associated = client->refbuf; + client->refbuf = ok; + fserve_add_client_callback (client, source_client_callback, source); + break; + default: + WARN1("Got unkown source auth type: %d", auth_style); + return; + } +} static void _handle_stats_request (client_t *client, char *uri) { -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 13/31] Connection: let everything go through fserve
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 3 +-- 1 files changed, 1 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 4dfbad7..8549f58 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1015,7 +1015,6 @@ void source_startup (client_t *client, const char *uri, int auth_style) case SHOUTCAST_SOURCE_AUTH: source->shoutcast_compat = 1; case NOAUTH_SOURCE_AUTH: - source_client_callback (client, source); break; case ICECAST_SOURCE_AUTH: ok = refbuf_new (PER_CLIENT_REFBUF_SIZE); @@ -1026,12 +1025,12 @@ void source_startup (client_t *client, const char *uri, int auth_style) /* we may have unprocessed data read in, so don't overwrite it */ ok->associated = client->refbuf; client->refbuf = ok; - fserve_add_client_callback (client, source_client_callback, source); break; default: WARN1("Got unkown source auth type: %d", auth_style); return; } + fserve_add_client_callback (client, source_client_callback, source); } static void _handle_stats_request (client_t *client, char *uri) -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 14/31] Add POST support. WARNING, still no AUTH
This adds the hability to use POST requests into IceCast, allowing to push with any client. This still doesn't support AUTH, but implementing some kind of HTTP auth should be straight forward. This can be usefull to push directly from FFMPEG into IceCast without having to wrap around Icy. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 10 ++++++++++ 1 files changed, 10 insertions(+), 0 deletions(-) diff --git a/src/connection.c b/src/connection.c index 8549f58..ed8bea9 100644 --- a/src/connection.c +++ b/src/connection.c @@ -78,6 +78,7 @@ Icecast auth style uses HTTP and Basic Authorization. */ +#define NOAUTH_SOURCE_AUTH 2 #define SHOUTCAST_SOURCE_AUTH 1 #define ICECAST_SOURCE_AUTH 0 @@ -960,7 +961,13 @@ int connection_check_pass (http_parser_t *parser, const char *user, const char * } return ret; } +/* XXX(xaiki): This may need AUTH support */ +static void _handle_post_request (client_t *client, const char *uri) +{ + INFO1("Source logging in at mountpoint \"%s\"", uri); + source_startup (client, uri, NOAUTH_SOURCE_AUTH); +} /* only called for native icecast source clients */ static void _handle_source_request (client_t *client, const char *uri) @@ -1276,6 +1283,9 @@ static void _handle_connection(void) if (parser->req_type == httpp_req_source) { _handle_source_request (client, uri); } + else if (parser->req_type == httpp_req_post) { + _handle_post_request (client, uri); + } else if (parser->req_type == httpp_req_stats) { _handle_stats_request (client, uri); } -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 15/31] Connection: simplify in-connection handeling (kill 1 function, and break shoutcast)
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 395 ++++++++++++++++++------------------------------------ 1 files changed, 131 insertions(+), 264 deletions(-) diff --git a/src/connection.c b/src/connection.c index ed8bea9..5c9e96e 100644 --- a/src/connection.c +++ b/src/connection.c @@ -108,7 +108,6 @@ static spin_t _connection_lock; static volatile unsigned long _current_id = 0; static int _initialized = 0; -static volatile client_queue_t *_req_queue = NULL, **_req_queue_tail = &_req_queue; static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; static int ssl_ok; #ifdef HAVE_OPENSSL @@ -120,7 +119,8 @@ cache_file_contents banned_ip, allowed_ip; rwlock_t _source_shutdown_rwlock; -static void _handle_connection(void); +static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount); +static int _handle_client (client_t *client); static int compare_ip (void *arg, void *a, void *b) { @@ -146,8 +146,6 @@ void connection_initialize(void) thread_mutex_create(&move_clients_mutex); thread_rwlock_create(&_source_shutdown_rwlock); thread_cond_init(&global.shutdown_cond); - _req_queue = NULL; - _req_queue_tail = &_req_queue; _con_queue = NULL; _con_queue_tail = &_con_queue; @@ -553,100 +551,6 @@ static connection_t *_accept_connection(int duration) return NULL; } - -/* add client to connection queue. At this point some header information - * has been collected, so we now pass it onto the connection thread for - * further processing - */ -static void _add_connection (client_queue_t *node) -{ - *_con_queue_tail = node; - _con_queue_tail = (volatile client_queue_t **)&node->next; -} - - -/* this returns queued clients for the connection thread. headers are - * already provided, but need to be parsed. - */ -static client_queue_t *_get_connection(void) -{ - client_queue_t *node = NULL; - - /* common case, no new connections so don't bother taking locks */ - if (_con_queue) - { - node = (client_queue_t *)_con_queue; - _con_queue = node->next; - if (_con_queue == NULL) - _con_queue_tail = &_con_queue; - node->next = NULL; - } - return node; -} - - -/* run along queue checking for any data that has come in or a timeout */ -static void process_request_queue (int timeout) -{ - client_queue_t **node_ref = (client_queue_t **)&_req_queue; - - while (*node_ref) - { - client_queue_t *node = *node_ref; - client_t *client = node->client; - int len = PER_CLIENT_REFBUF_SIZE - 1 - node->offset; - char *buf = client->refbuf->data + node->offset; - - if (len > 0) - { - if (client->con->con_time + timeout <= time(NULL)) - len = 0; - else - len = client_read_bytes (client, buf, len); - } - - if (len > 0) - { - node->offset += len; - if ((len = util_find_eos_delim(client->refbuf, -node->offset, - node->shoutcast?HEADER_READ_LINE:HEADER_READ_ENTIRE)) < 0) { - node->stream_offset = len; - if ((client_queue_t **)_req_queue_tail == &(node->next)) - _req_queue_tail = (volatile client_queue_t **)node_ref; - *node_ref = node->next; - node->next = NULL; - _add_connection (node); - continue; - } - } - else - { - if (len == 0 || client->con->error) - { - if ((client_queue_t **)_req_queue_tail == &node->next) - _req_queue_tail = (volatile client_queue_t **)node_ref; - *node_ref = node->next; - client_destroy (client); - free (node); - continue; - } - } - node_ref = &node->next; - } - _handle_connection(); -} - - -/* add node to the queue of requests. This is where the clients are when - * initial http details are read. - */ -static void _add_request_queue (client_queue_t *node) -{ - *_req_queue_tail = node; - _req_queue_tail = (volatile client_queue_t **)&node->next; -} - - void connection_accept_loop (void) { connection_t *con; @@ -663,68 +567,99 @@ void connection_accept_loop (void) { con = _accept_connection (duration); - if (con) - { - client_queue_t *node; - ice_config_t *config; - client_t *client = NULL; - listener_t *listener; + if (!con) { + duration = 300; /* use longer timeouts when nothing waiting */ + continue; + } - global_lock(); - if (client_create (&client, con, NULL) < 0) - { - global_unlock(); - client_send_403 (client, "Icecast connection limit reached"); - /* don't be too eager as this is an imposed hard limit */ - thread_sleep (400000); - continue; - } + ice_config_t *config; + client_t *client = NULL; + listener_t *listener; + refbuf_t *header = NULL; + http_parser_t *parser = NULL; + int hdrsize = 0; + int shoutcast = 0; + char *shoutcast_mount = NULL; + + header = refbuf_new (PER_CLIENT_REFBUF_SIZE); + hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); + if (hdrsize < 0) + { + global_unlock(); + ERROR ("Header read failed"); + thread_sleep (400000); + continue; + } - /* setup client for reading incoming http */ - client->refbuf->data [PER_CLIENT_REFBUF_SIZE-1] = '\000'; + /* process normal HTTP headers */ + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + if (!httpp_parse (parser, header->data, hdrsize)) + { + ERROR0("HTTP request parsing failed"); + client_destroy (client); + continue; + } - if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock)) - { - global_unlock(); - WARN0 ("failed to set tcp options on client connection, dropping"); - client_destroy (client); - continue; - } + if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) + { + ERROR("Error(%s)", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); + break; + } - node = calloc (1, sizeof (client_queue_t)); - if (node == NULL) - { - global_unlock(); - client_destroy (client); - continue; - } - node->client = client; + global_lock(); + if (client_create (&client, con, parser) < 0) + { + global_unlock(); + client_send_403 (client, "Icecast connection limit reached"); + /* don't be too eager as this is an imposed hard limit */ + thread_sleep (400000); + continue; + } - config = config_get_config(); - listener = config_get_listen_sock (config, client->con); + client_set_queue (client, header); - if (listener) - { - if (listener->shoutcast_compat) - node->shoutcast = 1; - if (listener->ssl && ssl_ok) - connection_uses_ssl (client->con); - if (listener->shoutcast_mount) - node->shoutcast_mount = strdup (listener->shoutcast_mount); - } + if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock)) + { global_unlock(); - config_release_config(); - - _add_request_queue (node); - stats_event_inc (NULL, "connections"); - duration = 5; + WARN0 ("failed to set tcp options on client connection, dropping"); + client_destroy (client); + continue; } - else + + header->len -= hdrsize; + memmove(header->data, header->data + hdrsize, header->len); + client_set_queue (client, header); + refbuf_release(header); + +// client->pos = hdrsize; + + config = config_get_config(); + listener = config_get_listen_sock (config, client->con); + + if (listener) { - if (_req_queue == NULL) - duration = 300; /* use longer timeouts when nothing waiting */ + if (listener->shoutcast_compat) + shoutcast = 1; + if (listener->ssl && ssl_ok) + connection_uses_ssl (client->con); + if (listener->shoutcast_mount) + shoutcast_mount = strdup (listener->shoutcast_mount); + } + global_unlock(); + config_release_config(); + + stats_event_inc (NULL, "connections"); + duration = 5; + + if (client->con->con_time + timeout <= time(NULL)) + continue; + + if (shoutcast) { + _handle_shoutcast_compatible (shoutcast, shoutcast_mount); + } else { + _handle_client (client); } - process_request_queue (timeout); } /* Give all the other threads notification to shut down */ @@ -1113,22 +1048,22 @@ static void _handle_get_request (client_t *client, char *passed_uri) if (uri != passed_uri) free (uri); } -static void _handle_shoutcast_compatible (client_queue_t *node) +static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount) { +/* + SHOUTCAST IS BROKEN +*/ char *http_compliant; int http_compliant_len = 0; http_parser_t *parser; ice_config_t *config = config_get_config (); - char *shoutcast_mount; - client_t *client = node->client; + client_t *client = NULL; //node->client; refbuf_t *refbuf = client->refbuf; - if (node->shoutcast_mount) - shoutcast_mount = node->shoutcast_mount; - else + if (!shoutcast_mount) shoutcast_mount = config->shoutcast_mount; - if (node->shoutcast == 1) + if (shoutcast == 1) { char *source_password; mount_proxy *mountinfo = config_find_mount (config, shoutcast_mount); @@ -1143,8 +1078,6 @@ static void _handle_shoutcast_compatible (client_queue_t *node) if ((hdrlen = util_find_eos_delim (client->refbuf, 0, HEADER_READ_LINE)) < 0) { client_destroy (client); free (source_password); - free (node->shoutcast_mount); - free (node); return; } @@ -1153,20 +1086,17 @@ static void _handle_shoutcast_compatible (client_queue_t *node) /* send this non-blocking but if there is only a partial write * then leave to header timeout */ sock_write (client->con->sock, "OK2\r\nicy-caps:11\r\n\r\n"); - node->offset -= hdrlen; - memmove (refbuf->data, refbuf->data + hdrlen, node->offset+1); - node->shoutcast = 2; + shoutcast = 2; /* we've checked the password, now send it back for reading headers */ - _add_request_queue (node); +// _add_request_queue (node); free (source_password); return; } - INFO1 ("password does not match \"%s\"", client->refbuf->data); + INFO1 ("password does not match \"%s\"", refbuf->data); + client_destroy (client); free (source_password); - free (node->shoutcast_mount); - free (node); return; } /* actually make a copy as we are dropping the config lock */ @@ -1175,7 +1105,7 @@ static void _handle_shoutcast_compatible (client_queue_t *node) /* Here we create a valid HTTP request based of the information that was passed in via the non-HTTP style protocol above. This means we can use some of our existing code to handle this case */ - http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset; +// http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset; http_compliant = (char *)calloc(1, http_compliant_len); snprintf (http_compliant, http_compliant_len, "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, refbuf->data); @@ -1183,15 +1113,6 @@ static void _handle_shoutcast_compatible (client_queue_t *node) httpp_initialize(parser, NULL); if (httpp_parse (parser, http_compliant, strlen(http_compliant))) { - /* we may have more than just headers, so prepare for it */ - if (node->stream_offset == node->offset) - refbuf->len = 0; - else - { - char *ptr = refbuf->data; - refbuf->len = node->offset - node->stream_offset; - memmove (ptr, ptr + node->stream_offset, refbuf->len); - } client->parser = parser; source_startup (client, shoutcast_mount, SHOUTCAST_SOURCE_AUTH); } @@ -1201,106 +1122,52 @@ static void _handle_shoutcast_compatible (client_queue_t *node) } free (http_compliant); free (shoutcast_mount); - free (node->shoutcast_mount); - free (node); return; } - -/* Connection thread. Here we take clients off the connection queue and check - * the contents provided. We set up the parser then hand off to the specific - * request handler. - */ -static void _handle_connection(void) +static int _handle_client (client_t *client) { - http_parser_t *parser; const char *rawuri; - client_queue_t *node; - - while (1) - { - node = _get_connection(); - if (! node) - break; - - client_t *client = node->client; - refbuf_t *refbuf = client->refbuf; - char *uri; - - - /* Check for special shoutcast compatability processing */ - if (node->shoutcast) - { - _handle_shoutcast_compatible (node); - continue; - } - - /* process normal HTTP headers */ - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - client->parser = parser; - if (!httpp_parse (parser, refbuf->data, node->offset)) - { - free (node); - ERROR0("HTTP request parsing failed"); - client_destroy (client); - continue; - } - - /* we may have more than just headers, so prepare for it */ - if (node->stream_offset == node->offset) { - refbuf->len = 0; - } else { - char *ptr = refbuf->data; - refbuf->len = node->offset - node->stream_offset; - memmove (ptr, ptr + node->stream_offset, refbuf->len); - } - - rawuri = httpp_getvar(parser, HTTPP_VAR_URI); + http_parser_t *parser = client->parser; + char *uri; - /* assign a port-based shoutcast mountpoint if required */ - if (node->shoutcast_mount && strcmp (rawuri, "/admin.cgi") == 0) - httpp_set_query_param (client->parser, "mount", node->shoutcast_mount); + rawuri = httpp_getvar(parser, HTTPP_VAR_URI); - free (node->shoutcast_mount); - free (node); - - if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && - strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { - ERROR0("Bad HTTP protocol detected"); - client_destroy (client); - continue; - } + if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && + strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { + ERROR0("Bad HTTP protocol detected"); + client_destroy (client); + return 0; + } - uri = util_normalise_uri(rawuri); + uri = util_normalise_uri(rawuri); - if (uri == NULL) - { - client_destroy (client); - continue; - } - - if (parser->req_type == httpp_req_source) { - _handle_source_request (client, uri); - } - else if (parser->req_type == httpp_req_post) { - _handle_post_request (client, uri); - } - else if (parser->req_type == httpp_req_stats) { - _handle_stats_request (client, uri); - } - else if (parser->req_type == httpp_req_get) { - _handle_get_request (client, uri); - } - else { - ERROR0("Wrong request type from client"); - client_send_400 (client, "unknown request"); - } + if (uri == NULL) + { + client_destroy (client); + return 0; + } - free(uri); + if (parser->req_type == httpp_req_source) { + _handle_source_request (client, uri); + } + else if (parser->req_type == httpp_req_post) { + _handle_post_request (client, uri); + } + else if (parser->req_type == httpp_req_stats) { + _handle_stats_request (client, uri); + } + else if (parser->req_type == httpp_req_get) { + _handle_get_request (client, uri); + } + else { + ERROR0("Wrong request type from client"); + client_send_400 (client, "unknown request"); } -} + free(uri); + return 1; +} /* called when listening thread is not checking for incoming connections */ int connection_setup_sockets (ice_config_t *config) -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 16/31] Connection: extract connection_process
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 185 ++++++++++++++++++++++++++++-------------------------- 1 files changed, 97 insertions(+), 88 deletions(-) diff --git a/src/connection.c b/src/connection.c index 5c9e96e..7641e8e 100644 --- a/src/connection.c +++ b/src/connection.c @@ -551,6 +551,101 @@ static connection_t *_accept_connection(int duration) return NULL; } +int connection_process (connection_t *con, int timeout) { + ice_config_t *config; + client_t *client = NULL; + listener_t *listener; + refbuf_t *header = NULL; + http_parser_t *parser = NULL; + int hdrsize = 0; + int shoutcast = 0; + char *shoutcast_mount = NULL; + + header = refbuf_new (PER_CLIENT_REFBUF_SIZE); + hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); + if (hdrsize < 0) + { + global_unlock(); + ERROR ("Header read failed"); + thread_sleep (400000); + return -1; + } + + /* process normal HTTP headers */ + parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + if (!httpp_parse (parser, header->data, hdrsize)) + { + ERROR0("HTTP request parsing failed"); + client_destroy (client); + return -1; + } + + if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) + { + ERROR("Error(%s)", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); + return -1; + } + + if (header->sync_point && (parser->req_type == httpp_req_source || + parser->req_type == httpp_req_post)) { + hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); + if (hdrsize < 0) { + INFO ("Header read failed"); + return hdrsize; + } + } + + global_lock(); + if (client_create (&client, con, parser) < 0) + { + global_unlock(); + client_send_403 (client, "Icecast connection limit reached"); + /* don't be too eager as this is an imposed hard limit */ + thread_sleep (400000); + return -1; + } + + if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock)) + { + global_unlock(); + WARN0 ("failed to set tcp options on client connection, dropping"); + client_destroy (client); + return -1; + } + + header->len -= hdrsize; + memmove(header->data, header->data + hdrsize, header->len); + client_set_queue (client, header); + refbuf_release(header); + + config = config_get_config(); + listener = config_get_listen_sock (config, client->con); + + if (listener) + { + if (listener->shoutcast_compat) + shoutcast = 1; + if (listener->ssl && ssl_ok) + connection_uses_ssl (client->con); + if (listener->shoutcast_mount) + shoutcast_mount = strdup (listener->shoutcast_mount); + } + global_unlock(); + config_release_config(); + + if (client->con->con_time + timeout <= time(NULL)) + return -1; + + stats_event_inc (NULL, "connections"); + + if (shoutcast) { + _handle_shoutcast_compatible (shoutcast, shoutcast_mount); + return 0; + } + return _handle_client (client); +} + void connection_accept_loop (void) { connection_t *con; @@ -572,94 +667,8 @@ void connection_accept_loop (void) continue; } - ice_config_t *config; - client_t *client = NULL; - listener_t *listener; - refbuf_t *header = NULL; - http_parser_t *parser = NULL; - int hdrsize = 0; - int shoutcast = 0; - char *shoutcast_mount = NULL; - - header = refbuf_new (PER_CLIENT_REFBUF_SIZE); - hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); - if (hdrsize < 0) - { - global_unlock(); - ERROR ("Header read failed"); - thread_sleep (400000); - continue; - } - - /* process normal HTTP headers */ - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - if (!httpp_parse (parser, header->data, hdrsize)) - { - ERROR0("HTTP request parsing failed"); - client_destroy (client); - continue; - } - - if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) - { - ERROR("Error(%s)", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); - break; - } - - global_lock(); - if (client_create (&client, con, parser) < 0) - { - global_unlock(); - client_send_403 (client, "Icecast connection limit reached"); - /* don't be too eager as this is an imposed hard limit */ - thread_sleep (400000); - continue; - } - - client_set_queue (client, header); - - if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock)) - { - global_unlock(); - WARN0 ("failed to set tcp options on client connection, dropping"); - client_destroy (client); - continue; - } - - header->len -= hdrsize; - memmove(header->data, header->data + hdrsize, header->len); - client_set_queue (client, header); - refbuf_release(header); - -// client->pos = hdrsize; - - config = config_get_config(); - listener = config_get_listen_sock (config, client->con); - - if (listener) - { - if (listener->shoutcast_compat) - shoutcast = 1; - if (listener->ssl && ssl_ok) - connection_uses_ssl (client->con); - if (listener->shoutcast_mount) - shoutcast_mount = strdup (listener->shoutcast_mount); - } - global_unlock(); - config_release_config(); - - stats_event_inc (NULL, "connections"); - duration = 5; - - if (client->con->con_time + timeout <= time(NULL)) - continue; - - if (shoutcast) { - _handle_shoutcast_compatible (shoutcast, shoutcast_mount); - } else { - _handle_client (client); - } + if (connection_process (con, timeout) != -1) + duration = 5; } /* Give all the other threads notification to shut down */ -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 17/31] Connection: add threads this needs to go after the client_tag_t obsoletting patch
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 150 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 files changed, 132 insertions(+), 18 deletions(-) diff --git a/src/connection.c b/src/connection.c index 7641e8e..118258d 100644 --- a/src/connection.c +++ b/src/connection.c @@ -82,19 +82,10 @@ #define SHOUTCAST_SOURCE_AUTH 1 #define ICECAST_SOURCE_AUTH 0 -typedef struct client_queue_tag { - client_t *client; - int offset; - int stream_offset; - int shoutcast; - char *shoutcast_mount; - struct client_queue_tag *next; -} client_queue_t; - -typedef struct _thread_queue_tag { - thread_type *thread_id; - struct _thread_queue_tag *next; -} thread_queue_t; +typedef struct connection_queue_tag { + connection_t *con; + struct connection_queue_tag *next; +} connection_queue_t; typedef struct { @@ -107,8 +98,11 @@ typedef struct static spin_t _connection_lock; static volatile unsigned long _current_id = 0; static int _initialized = 0; +static volatile int _connection_running = 0; +static volatile connection_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; +static cond_t *_connection_cond; +static thread_type *_connection_thread_id; -static volatile client_queue_t *_con_queue = NULL, **_con_queue_tail = &_con_queue; static int ssl_ok; #ifdef HAVE_OPENSSL static SSL_CTX *ssl_ctx; @@ -121,6 +115,8 @@ rwlock_t _source_shutdown_rwlock; static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount); static int _handle_client (client_t *client); +static int _connection_process (connection_t *con, int timeout); +static void *_connection_thread (void *arg); static int compare_ip (void *arg, void *a, void *b) { @@ -155,6 +151,12 @@ void connection_initialize(void) allowed_ip.contents = NULL; allowed_ip.file_mtime = 0; + _connection_running = 1; + /* (XXX)xaiki: need a way to make it go away on shutdown, ok for now */ + _connection_thread_id = thread_create ("Connection Thread", _connection_thread, + NULL, THREAD_DETACHED); + _connection_cond = thread_cond_create (); + _initialized = 1; } @@ -162,12 +164,20 @@ void connection_shutdown(void) { if (!_initialized) return; + _connection_running = 0; + thread_cond_signal(_connection_cond); + DEBUG0 ("waiting for connection thread"); + thread_join(_connection_thread_id); + #ifdef HAVE_OPENSSL SSL_CTX_free (ssl_ctx); #endif if (banned_ip.contents) avl_tree_free (banned_ip.contents, free_filtered_ip); if (allowed_ip.contents) avl_tree_free (allowed_ip.contents, free_filtered_ip); + thread_cond_destroy (_connection_cond); + free (_connection_cond); + thread_cond_destroy(&global.shutdown_cond); thread_rwlock_destroy(&_source_shutdown_rwlock); thread_spin_destroy (&_connection_lock); @@ -551,7 +561,108 @@ static connection_t *_accept_connection(int duration) return NULL; } -int connection_process (connection_t *con, int timeout) { +connection_queue_t *_connection_node_new (connection_t *con) +{ + connection_queue_t *node; + if (!con) + return NULL; + + node = calloc (1, sizeof (connection_queue_t)); + if (!node) + return NULL; + + node->con = con; + + return node; +} + +/* add a connection to connection queue. At this point the connection + * has just been accepted, we push it to the queue and return so that + * we can keep getting connections in. + */ +static void _add_connection (connection_queue_t *node) +{ + WARN ("added connection"); + *_con_queue_tail = node; + _con_queue_tail = (volatile connection_queue_t **)&node->next; + + thread_cond_signal(_connection_cond); +} + +/* this returns queued clients for the connection thread. headers are + * already provided, but need to be parsed. + */ +static connection_queue_t *_get_connection(void) +{ + connection_queue_t *node = NULL; + + /* common case, no new connections so don't bother taking locks */ + if (_con_queue) { + node = (connection_queue_t *)_con_queue; + _con_queue = node->next; + if (_con_queue == NULL) + _con_queue_tail = &_con_queue; + node->next = NULL; + } else { + INFO("sleeping"); + thread_cond_wait(_connection_cond); + INFO("awake"); + } + return node; +} + +static void _connection_node_destroy (connection_queue_t *node) { + INFO("destroying node"); + + if (node->con) + connection_close(node->con); + free(node); +} + +static void *_connection_thread (void *arg) +{ + connection_queue_t *node; + int err; + + WARN("Launched connection thread"); + while (_connection_running) + { + /* XXX(xaiki): this needs to wait on a fd, so we don't kill polar bears */ + node = _get_connection(); + INFO("got node"); + if (!node) { + continue; + } + err = _connection_process (node, 3000); + if (err > 0) { + free(node); + continue; + } + + switch (-err) { + case EAGAIN: /* put it again at the end of the queue */ + _add_connection (node); + break; + case EINPROGRESS: /* already handled */ + free(node); + break; + default: + ERROR ("droping node (%p, client => %p), error = %d", node, node->client, err); + _connection_node_destroy (node); + break; + } + } + + while (_con_queue) { + node = _get_connection(); + _connection_node_destroy (node); + } + + INFO0 ("Connection thread shutdown complete"); + return NULL; +} + +static int _connection_process (connection_t *con, int timeout) { ice_config_t *config; client_t *client = NULL; listener_t *listener; @@ -648,6 +759,7 @@ int connection_process (connection_t *con, int timeout) { void connection_accept_loop (void) { + connection_queue_t *node; connection_t *con; ice_config_t *config; int duration = 300; @@ -667,8 +779,10 @@ void connection_accept_loop (void) continue; } - if (connection_process (con, timeout) != -1) - duration = 5; + /* add connection async to the connection queue, then the + * connection loop will do all the dirty work */ + node =_connection_node_new (con); + _add_connection (node); } /* Give all the other threads notification to shut down */ @@ -713,7 +827,7 @@ int connection_complete_source (source_t *source, int response) source->client = NULL; } WARN1("Content-type \"%s\" not supported, dropping source", contenttype); - return -1; + return -EINPROGRESS; } } else -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 18/31] connection_process takes node, con_q_t gets refbuf, and con_t timeout, util updated
This allows all to be re-entrant. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 27 ++++++++++++++++++--------- src/connection.h | 1 + src/util.c | 48 ++++++++++++++++++++++++++++++++++-------------- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/src/connection.c b/src/connection.c index 118258d..9532bab 100644 --- a/src/connection.c +++ b/src/connection.c @@ -84,6 +84,7 @@ typedef struct connection_queue_tag { connection_t *con; + refbuf_t *refbuf; struct connection_queue_tag *next; } connection_queue_t; @@ -115,7 +116,7 @@ rwlock_t _source_shutdown_rwlock; static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount); static int _handle_client (client_t *client); -static int _connection_process (connection_t *con, int timeout); +static int _connection_process (connection_queue_t *node); static void *_connection_thread (void *arg); static int compare_ip (void *arg, void *a, void *b) @@ -633,7 +634,7 @@ static void *_connection_thread (void *arg) if (!node) { continue; } - err = _connection_process (node, 3000); + err = _connection_process (node); if (err > 0) { free(node); continue; @@ -662,18 +663,20 @@ static void *_connection_thread (void *arg) return NULL; } -static int _connection_process (connection_t *con, int timeout) { +static int _connection_process (connection_queue_t *node) { ice_config_t *config; client_t *client = NULL; listener_t *listener; - refbuf_t *header = NULL; + refbuf_t *header; http_parser_t *parser = NULL; int hdrsize = 0; int shoutcast = 0; char *shoutcast_mount = NULL; - header = refbuf_new (PER_CLIENT_REFBUF_SIZE); - hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); + if (!node->refbuf) + node->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE); + header = node->refbuf; + hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE); if (hdrsize < 0) { global_unlock(); @@ -700,7 +703,7 @@ static int _connection_process (connection_t *con, int timeout) { if (header->sync_point && (parser->req_type == httpp_req_source || parser->req_type == httpp_req_post)) { - hdrsize = util_read_header (con, header, HEADER_READ_ENTIRE); + hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE); if (hdrsize < 0) { INFO ("Header read failed"); return hdrsize; @@ -708,7 +711,7 @@ static int _connection_process (connection_t *con, int timeout) { } global_lock(); - if (client_create (&client, con, parser) < 0) + if (client_create (&client, node->con, parser) < 0) { global_unlock(); client_send_403 (client, "Icecast connection limit reached"); @@ -745,8 +748,12 @@ static int _connection_process (connection_t *con, int timeout) { global_unlock(); config_release_config(); - if (client->con->con_time + timeout <= time(NULL)) +/* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */ + if (client->con->con_timeout <= time(NULL)) { + WARN("there might be a bug if you see this"); + client_destroy (client); return -1; + } stats_event_inc (NULL, "connections"); @@ -779,6 +786,8 @@ void connection_accept_loop (void) continue; } + con->con_timeout = time(NULL) + timeout; + /* add connection async to the connection queue, then the * connection loop will do all the dirty work */ node =_connection_node_new (con); diff --git a/src/connection.h b/src/connection.h index 80a2b10..42483cc 100644 --- a/src/connection.h +++ b/src/connection.h @@ -34,6 +34,7 @@ typedef struct connection_tag unsigned long id; time_t con_time; + time_t con_timeout; time_t discon_time; uint64_t sent_bytes; diff --git a/src/util.c b/src/util.c index 94b06f6..c74d1ce 100644 --- a/src/util.c +++ b/src/util.c @@ -142,9 +142,18 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) return -ENOENT; } - config = config_get_config(); - header_timeout = config->header_timeout*1000; - config_release_config(); + if (con->con_timeout == 0) { + DEBUG0 ("NO TIMEOUT"); + config = config_get_config(); + header_timeout = config->header_timeout*1000; + config_release_config(); + } else if (time(NULL) < con->con_timeout) { + DEBUG3 ("STILL HAVE TIME (%d - %d = %d)", time(NULL), con->con_timeout, con->con_timeout - time(NULL)); + header_timeout = 1000; + } else { + DEBUG0 ("HUM BROKEN PIPE"); + return -EPIPE; + } if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) { INFO("util_timed_wait_for_fd <= 0"); @@ -152,24 +161,19 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) } if (refbuf->sync_point < 0) { - DEBUG ("REENTRING, got and old non-resolved sync"); + DEBUG0 ("REENTRING, got and old non-resolved sync"); pos = -refbuf->sync_point; } else if (refbuf->sync_point > 0) { - DEBUG ("REENTRING, got and old resolved sync"); + DEBUG0 ("REENTRING, got and old resolved sync"); endpos = pos = refbuf->sync_point; } else { - DEBUG ("FIRST TIME, no sync"); + DEBUG0 ("FIRST TIME, no sync"); pos = 0; } - while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) >= 0) { - if (bytes == 0) - con->error = 1; - if (bytes == -1 && !sock_recoverable (sock_error())) - con->error = 1; - - DEBUG("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos); - /* this is used for re-entrance, so we get a new chance to read */ + while ((bytes = sock_read_bytes (con->sock, refbuf->data + pos, refbuf->len - pos)) > 0) { + DEBUG4 ("read %d, %d '%s'\nfrom pos '%s'", bytes, endpos, refbuf->data, refbuf->data + pos); + /* this is used for re-entrance, so we get a new chance to read */ if (endpos == -ENOENT) endpos = util_find_eos_delim (refbuf, -(bytes + pos), flags); if (endpos != -ENOENT) { @@ -198,8 +202,24 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) refbuf->sync_point = -pos; return -EAGAIN; } + + if (time(NULL) > con->con_timeout) + goto out_FAIL; + } + + if (bytes == 0 || ! sock_recoverable (sock_error())) { + WARN ("Connection error"); + con->error = 1; + return -EPIPE; + } + + if (time(NULL) < con->con_timeout) { + DEBUG3 ("STILL HAVE TIME pos == (%d)???bytes = %d??? data is %s", pos, bytes, refbuf->data); + DEBUG3 ("OUT, %p, refbuf->len = %d, refbuf->syncpoint = %d", refbuf, refbuf->len, refbuf->sync_point); + return -EAGAIN; } +out_FAIL: WARN("Couldn't find enough data, pos = %d, data = %s, entire = %d.\n", pos, refbuf->data, flags); refbuf->sync_point = 0; return -ENOENT; -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 19/31] connection: process takes node, not con+args, cleanup error handeling (propagate)
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 19 +++++++++---------- 1 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/connection.c b/src/connection.c index 9532bab..0e75fab 100644 --- a/src/connection.c +++ b/src/connection.c @@ -671,6 +671,7 @@ static int _connection_process (connection_queue_t *node) { http_parser_t *parser = NULL; int hdrsize = 0; int shoutcast = 0; + int err; char *shoutcast_mount = NULL; if (!node->refbuf) @@ -679,26 +680,24 @@ static int _connection_process (connection_queue_t *node) { hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE); if (hdrsize < 0) { - global_unlock(); ERROR ("Header read failed"); - thread_sleep (400000); - return -1; + return hdrsize; } /* process normal HTTP headers */ parser = httpp_create_parser(); httpp_initialize(parser, NULL); - if (!httpp_parse (parser, header->data, hdrsize)) + err = httpp_parse (parser, header->data, hdrsize); + if (err == 0) { ERROR0("HTTP request parsing failed"); - client_destroy (client); - return -1; + return -EINVAL; } if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) { ERROR("Error(%s)", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); - return -1; + return err; } if (header->sync_point && (parser->req_type == httpp_req_source || @@ -711,13 +710,13 @@ static int _connection_process (connection_queue_t *node) { } global_lock(); - if (client_create (&client, node->con, parser) < 0) + err = client_create (&client, node->con, parser); + if (err < 0) { global_unlock(); client_send_403 (client, "Icecast connection limit reached"); /* don't be too eager as this is an imposed hard limit */ - thread_sleep (400000); - return -1; + return err; } if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock)) -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 20/31] connection: add parser to connection, and use read_headers changes.
now we return as soon as we have a header, so if we want more (that we can only know once we have actually parsed), we reenter later Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 22 ++++++++++++++-------- 1 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/connection.c b/src/connection.c index 0e75fab..ab8dd4f 100644 --- a/src/connection.c +++ b/src/connection.c @@ -85,6 +85,7 @@ typedef struct connection_queue_tag { connection_t *con; refbuf_t *refbuf; + http_parser_t *parser; struct connection_queue_tag *next; } connection_queue_t; @@ -685,17 +686,20 @@ static int _connection_process (connection_queue_t *node) { } /* process normal HTTP headers */ - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); + if (node->parser) { + parser = node->parser; + } else { + parser = node->parser = httpp_create_parser(); + httpp_initialize(parser, NULL); + } + err = httpp_parse (parser, header->data, hdrsize); - if (err == 0) - { + if (err == 0) { ERROR0("HTTP request parsing failed"); return -EINVAL; } - if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) - { + if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) { ERROR("Error(%s)", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); return err; } @@ -728,8 +732,10 @@ static int _connection_process (connection_queue_t *node) { } header->len -= hdrsize; - memmove(header->data, header->data + hdrsize, header->len); - client_set_queue (client, header); + if (header->len) { + memmove(header->data, header->data + hdrsize, header->len); + client_set_queue (client, header); + } refbuf_release(header); config = config_get_config(); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 21/31] Add Client in connection_queue_t
extract setup code to it's own function and use ->con from node. Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 85 +++++++++++++++++++++++++++++++++++++---------------- 1 files changed, 59 insertions(+), 26 deletions(-) diff --git a/src/connection.c b/src/connection.c index ab8dd4f..c66a240 100644 --- a/src/connection.c +++ b/src/connection.c @@ -86,6 +86,7 @@ typedef struct connection_queue_tag { connection_t *con; refbuf_t *refbuf; http_parser_t *parser; + client_t *client; struct connection_queue_tag *next; } connection_queue_t; @@ -616,8 +617,19 @@ static connection_queue_t *_get_connection(void) static void _connection_node_destroy (connection_queue_t *node) { INFO("destroying node"); - if (node->con) - connection_close(node->con); + if (node->client) { + client_destroy(node->client); /* destroys con, parser, refbuf */ + } else { + if (node->parser) { + httpp_destroy(node->parser); + } + if (node->refbuf) { + refbuf_release(node->refbuf); + } + if (node->con) { + connection_close(node->con); + } + } free(node); } @@ -664,9 +676,43 @@ static void *_connection_thread (void *arg) return NULL; } +static int connection_client_setup (connection_queue_t *node) { + int err; + + global_lock(); + err = client_create (&node->client, node->con, node->parser); + if (err < 0) { + client_send_403 (node->client, "Icecast connection limit reached"); + /* don't be too eager as this is an imposed hard limit */ + goto out_fail; + } + + err = -EINVAL; + if (sock_set_blocking (node->con->sock, 0) || sock_set_nodelay (node->con->sock)) { + WARN0 ("failed to set tcp options on client connection, dropping"); + goto out_destroy_client; + } + +/* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */ + err = -ENOENT; + if (node->con->con_timeout <= time(NULL)) { + WARN("there might be a bug if you see this"); + goto out_destroy_client; + } + + global_unlock(); + + return 0; + +out_destroy_client: + client_destroy (node->client); +out_fail: + global_unlock(); + return err; +} + static int _connection_process (connection_queue_t *node) { ice_config_t *config; - client_t *client = NULL; listener_t *listener; refbuf_t *header; http_parser_t *parser = NULL; @@ -713,50 +759,37 @@ static int _connection_process (connection_queue_t *node) { } } - global_lock(); - err = client_create (&client, node->con, parser); - if (err < 0) - { - global_unlock(); - client_send_403 (client, "Icecast connection limit reached"); - /* don't be too eager as this is an imposed hard limit */ - return err; - } - - if (sock_set_blocking (client->con->sock, 0) || sock_set_nodelay (client->con->sock)) - { - global_unlock(); - WARN0 ("failed to set tcp options on client connection, dropping"); - client_destroy (client); - return -1; + if (! node->client) { + err = connection_client_setup (node); + if (err < 0) + return err; } header->len -= hdrsize; if (header->len) { memmove(header->data, header->data + hdrsize, header->len); - client_set_queue (client, header); + client_set_queue (node->client, header); } refbuf_release(header); config = config_get_config(); - listener = config_get_listen_sock (config, client->con); + listener = config_get_listen_sock (config, node->con); if (listener) { if (listener->shoutcast_compat) shoutcast = 1; if (listener->ssl && ssl_ok) - connection_uses_ssl (client->con); + connection_uses_ssl (node->con); if (listener->shoutcast_mount) shoutcast_mount = strdup (listener->shoutcast_mount); } - global_unlock(); config_release_config(); /* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */ - if (client->con->con_timeout <= time(NULL)) { + if (node->con->con_timeout <= time(NULL)) { WARN("there might be a bug if you see this"); - client_destroy (client); + client_destroy (node->client); return -1; } @@ -766,7 +799,7 @@ static int _connection_process (connection_queue_t *node) { _handle_shoutcast_compatible (shoutcast, shoutcast_mount); return 0; } - return _handle_client (client); + return _handle_client (node->client); } void connection_accept_loop (void) -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 22/31] Fix Shoutcast, Move it to a one-pass process
(our function is re-entrant so timeouts are almost automatically handled) test if we have a shoutcast connection. stage1, read password, validate it, and prepare in-refbuf with POST (no auth) get into the normal POST-like loop Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 180 ++++++++++++++++++++++++------------------------------ 1 files changed, 79 insertions(+), 101 deletions(-) diff --git a/src/connection.c b/src/connection.c index c66a240..9269ad6 100644 --- a/src/connection.c +++ b/src/connection.c @@ -116,8 +116,8 @@ cache_file_contents banned_ip, allowed_ip; rwlock_t _source_shutdown_rwlock; -static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount); static int _handle_client (client_t *client); +static int _handle_shoutcast_stage1 (connection_queue_t *node, char *shoutcast_mount, mount_proxy *mountinfo); static int _connection_process (connection_queue_t *node); static void *_connection_thread (void *arg); @@ -711,19 +711,52 @@ out_fail: return err; } +/* we don't need to clean up on err, as we'll go through the node struct and clean all we have inside */ static int _connection_process (connection_queue_t *node) { - ice_config_t *config; - listener_t *listener; refbuf_t *header; http_parser_t *parser = NULL; int hdrsize = 0; int shoutcast = 0; int err; char *shoutcast_mount = NULL; + mount_proxy *mountinfo; + + ice_config_t *config; + listener_t *listener; if (!node->refbuf) node->refbuf = refbuf_new (PER_CLIENT_REFBUF_SIZE); header = node->refbuf; + + { /* this code tests for shoutcastness */ + config = config_get_config(); + listener = config_get_listen_sock (config, node->con); + + if (listener) { + WARN("listner"); + if (listener->shoutcast_compat) + shoutcast = 1; + if (listener->ssl && ssl_ok) + connection_uses_ssl (node->con); + if (listener->shoutcast_mount) { + shoutcast_mount = strdup (listener->shoutcast_mount); + } else { + shoutcast_mount = config->shoutcast_mount; + } + } + + WARN("shoutcast %d, mount %s", shoutcast, shoutcast_mount); + + mountinfo = config_find_mount (config, shoutcast_mount); + config_release_config(); + } + + if (shoutcast && !header->sync_point) { /* stage2 is actually handled by generic code */ + err = _handle_shoutcast_stage1 (node, shoutcast_mount, mountinfo); + if (err < 0) + return err; + } + hdrsize = util_read_header (node->con, header, HEADER_READ_ENTIRE); if (hdrsize < 0) { @@ -763,42 +796,19 @@ static int _connection_process (connection_queue_t *node) { err = connection_client_setup (node); if (err < 0) return err; - } - - header->len -= hdrsize; - if (header->len) { - memmove(header->data, header->data + hdrsize, header->len); - client_set_queue (node->client, header); - } - refbuf_release(header); - - config = config_get_config(); - listener = config_get_listen_sock (config, node->con); - if (listener) - { - if (listener->shoutcast_compat) - shoutcast = 1; - if (listener->ssl && ssl_ok) - connection_uses_ssl (node->con); - if (listener->shoutcast_mount) - shoutcast_mount = strdup (listener->shoutcast_mount); - } - config_release_config(); - -/* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */ - if (node->con->con_timeout <= time(NULL)) { - WARN("there might be a bug if you see this"); - client_destroy (node->client); - return -1; + header->len -= hdrsize; + if (header->len) { + memmove(header->data, header->data + hdrsize, header->len); + client_set_queue (node->client, header); + } + refbuf_release(header); } stats_event_inc (NULL, "connections"); - if (shoutcast) { - _handle_shoutcast_compatible (shoutcast, shoutcast_mount); - return 0; - } + WARN("shoutcast = %d", shoutcast); + return _handle_client (node->client); } @@ -1218,81 +1228,49 @@ static void _handle_get_request (client_t *client, char *passed_uri) if (uri != passed_uri) free (uri); } -static void _handle_shoutcast_compatible (int shoutcast, char *shoutcast_mount) +static int _handle_shoutcast_stage1 (connection_queue_t *node, char *shoutcast_mount, mount_proxy *mountinfo) { -/* - SHOUTCAST IS BROKEN -*/ - char *http_compliant; - int http_compliant_len = 0; - http_parser_t *parser; - ice_config_t *config = config_get_config (); - client_t *client = NULL; //node->client; - refbuf_t *refbuf = client->refbuf; + refbuf_t *refbuf = node->refbuf; + char *source_password; + int err, passlen; - if (!shoutcast_mount) - shoutcast_mount = config->shoutcast_mount; - - if (shoutcast == 1) - { - char *source_password; - mount_proxy *mountinfo = config_find_mount (config, shoutcast_mount); - int hdrlen; + WARN ("IN"); - if (mountinfo && mountinfo->password) - source_password = strdup (mountinfo->password); - else - source_password = strdup (config->source_password); + if (mountinfo && mountinfo->password) { + source_password = strdup (mountinfo->password); + } else { + ice_config_t *config = config_get_config (); + source_password = strdup (config->source_password); config_release_config(); + } - if ((hdrlen = util_find_eos_delim (client->refbuf, 0, HEADER_READ_LINE)) < 0) { - client_destroy (client); - free (source_password); - return; - } + passlen = util_read_header (node->con, refbuf, HEADER_READ_LINE); + if (passlen <= 0) { + WARN ("HEADER READ FAILED"); + err = passlen; + goto out_FAIL; + } - if (memmem (refbuf->data, hdrlen, source_password, strlen(source_password)) != NULL) { - client->respcode = 200; - /* send this non-blocking but if there is only a partial write - * then leave to header timeout */ - sock_write (client->con->sock, "OK2\r\nicy-caps:11\r\n\r\n"); - shoutcast = 2; - /* we've checked the password, now send it back for reading headers */ -// _add_request_queue (node); - free (source_password); - return; - } + if (memmem (refbuf->data, passlen, source_password, strlen(source_password)) == NULL) { + INFO ("password does not match (%ld) \"%s\" (%d) \"%s\"", + strlen(source_password), source_password, passlen, refbuf->data); + err = -ENOENT; + goto out_FAIL; + } - INFO1 ("password does not match \"%s\"", refbuf->data); +/* send this non-blocking but if there is only a partial write + * then leave to header timeout */ + sock_write (node->con->sock, "OK2\r\nicy-caps:11\r\n\r\n"); - client_destroy (client); - free (source_password); - return; - } - /* actually make a copy as we are dropping the config lock */ - shoutcast_mount = strdup (shoutcast_mount); - config_release_config(); - /* Here we create a valid HTTP request based of the information - that was passed in via the non-HTTP style protocol above. This - means we can use some of our existing code to handle this case */ -// http_compliant_len = 20 + strlen (shoutcast_mount) + node->offset; - http_compliant = (char *)calloc(1, http_compliant_len); - snprintf (http_compliant, http_compliant_len, - "SOURCE %s HTTP/1.0\r\n%s", shoutcast_mount, refbuf->data); - parser = httpp_create_parser(); - httpp_initialize(parser, NULL); - if (httpp_parse (parser, http_compliant, strlen(http_compliant))) - { - client->parser = parser; - source_startup (client, shoutcast_mount, SHOUTCAST_SOURCE_AUTH); - } - else { - httpp_destroy (parser); - client_destroy (client); - } - free (http_compliant); - free (shoutcast_mount); - return; + refbuf->sync_point = snprintf (refbuf->data, refbuf->len, "POST %s HTTP/1.0\r\n", shoutcast_mount); + + /* we've checked the password, now send it back for reading headers */ + free (source_password); + return 0; + +out_FAIL: + free (source_password); + return err; } static int _handle_client (client_t *client) -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 23/31] connection: duration should be bigger
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 5 +++-- 1 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 9269ad6..802a8ad 100644 --- a/src/connection.c +++ b/src/connection.c @@ -817,7 +817,7 @@ void connection_accept_loop (void) connection_queue_t *node; connection_t *con; ice_config_t *config; - int duration = 300; + int duration = 3000; int timeout = 0; config = config_get_config (); @@ -830,7 +830,7 @@ void connection_accept_loop (void) con = _accept_connection (duration); if (!con) { - duration = 300; /* use longer timeouts when nothing waiting */ + duration = 30000; /* use longer timeouts when nothing waiting */ continue; } @@ -840,6 +840,7 @@ void connection_accept_loop (void) * connection loop will do all the dirty work */ node =_connection_node_new (con); _add_connection (node); + duration = 3000; } /* Give all the other threads notification to shut down */ -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 24/31] connection: reorder logic in connection_setup_sockets
Make this all more readeable, exit points are now clearer --- src/connection.c | 39 +++++++++++++++++++-------------------- 1 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/connection.c b/src/connection.c index 802a8ad..5024690 100644 --- a/src/connection.c +++ b/src/connection.c @@ -538,28 +538,27 @@ static connection_t *_accept_connection(int duration) ip = (char *)malloc(MAX_ADDR_LEN); sock = sock_accept(serversock, ip, MAX_ADDR_LEN); - if (sock != SOCK_ERROR) - { - connection_t *con = NULL; - /* Make any IPv4 mapped IPv6 address look like a normal IPv4 address */ - if (strncmp (ip, "::ffff:", 7) == 0) - memmove (ip, ip+7, strlen (ip+7)+1); - - if (accept_ip_address (ip)) { - con = connection_create (sock, serversock, ip); - if (con) - return con; - } - sock_close (sock); + if (sock == SOCK_ERROR) { + free (ip); + if (sock_recoverable(sock_error())) + return NULL; + + WARN2("accept() failed with error %d: %s", sock_error(), strerror(sock_error())); + thread_sleep (500000); + return NULL; } - else - { - if (!sock_recoverable(sock_error())) - { - WARN2("accept() failed with error %d: %s", sock_error(), strerror(sock_error())); - thread_sleep (500000); - } + + connection_t *con = NULL; + /* Make any IPv4 mapped IPv6 address look like a normal IPv4 address */ + if (strncmp (ip, "::ffff:", 7) == 0) + memmove (ip, ip+7, strlen (ip+7)+1); + + if (accept_ip_address (ip)) { + con = connection_create (sock, serversock, ip); + if (con) + return con; } + sock_close (sock); free(ip); return NULL; } -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 25/31] client_send_400, print 400 message
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/client.c | 1 + 1 files changed, 1 insertions(+), 0 deletions(-) diff --git a/src/client.c b/src/client.c index f482c6b..c71102c 100644 --- a/src/client.c +++ b/src/client.c @@ -182,6 +182,7 @@ int client_read_bytes (client_t *client, void *buf, unsigned len) void client_send_400(client_t *client, char *message) { + ERROR("400 -> %s", message); snprintf (client->refbuf->data, PER_CLIENT_REFBUF_SIZE, "HTTP/1.0 400 Bad Request\r\n" "Content-Type: text/html\r\n\r\n" -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 26/31] Connection: handle_client returns err to client (via client_send_400)
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 16 +++++++--------- 1 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/connection.c b/src/connection.c index 5024690..5627933 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1283,17 +1283,15 @@ static int _handle_client (client_t *client) if (strcmp("ICE", httpp_getvar(parser, HTTPP_VAR_PROTOCOL)) && strcmp("HTTP", httpp_getvar(parser, HTTPP_VAR_PROTOCOL))) { - ERROR0("Bad HTTP protocol detected"); - client_destroy (client); - return 0; + client_send_400 (client, "Bad HTTP protocol"); + return -EINPROGRESS; } uri = util_normalise_uri(rawuri); - if (uri == NULL) - { - client_destroy (client); - return 0; + if (uri == NULL) { + client_send_400 (client, "Couldn't normalize uri"); + return -EINPROGRESS; } if (parser->req_type == httpp_req_source) { @@ -1309,8 +1307,8 @@ static int _handle_client (client_t *client) _handle_get_request (client, uri); } else { - ERROR0("Wrong request type from client"); - client_send_400 (client, "unknown request"); + client_send_400 (client, "Wrong request type from client"); + return -EINPROGRESS; } free(uri); -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 27/31] connection: more coments for remy
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 4 +++- 1 files changed, 3 insertions(+), 1 deletions(-) diff --git a/src/connection.c b/src/connection.c index 5627933..45a0df9 100644 --- a/src/connection.c +++ b/src/connection.c @@ -654,7 +654,8 @@ static void *_connection_thread (void *arg) switch (-err) { case EAGAIN: /* put it again at the end of the queue */ - _add_connection (node); + DEBUG1 ("Re-adding connection (%p) on top of queue", node); + _add_connection (node); /* XXX concurency */ break; case EINPROGRESS: /* already handled */ free(node); @@ -777,6 +778,7 @@ static int _connection_process (connection_queue_t *node) { return -EINVAL; } + /* XXX what happens when error in http ??? is err set ? */ if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE)) { ERROR("Error(%s)", httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE)); return err; -- 1.7.1
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 13 ++++++++++--- src/util.c | 12 ++++++------ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/connection.c b/src/connection.c index 45a0df9..8d2d906 100644 --- a/src/connection.c +++ b/src/connection.c @@ -426,6 +426,7 @@ connection_t *connection_create (sock_t sock, sock_t serversock, char *ip) con->send = connection_send; } + DEBUG3 ("creating con = %p for fd =%d on %d.", con, sock, serversock); return con; } @@ -575,6 +576,7 @@ connection_queue_t *_connection_node_new (connection_t *con) node->con = con; + DEBUG2 ("created connection node %p -> %p", node, con); return node; } @@ -606,15 +608,15 @@ static connection_queue_t *_get_connection(void) _con_queue_tail = &_con_queue; node->next = NULL; } else { - INFO("sleeping"); + DEBUG ("connection thread sleeping"); thread_cond_wait(_connection_cond); - INFO("awake"); + DEBUG ("connection thread awake"); } return node; } static void _connection_node_destroy (connection_queue_t *node) { - INFO("destroying node"); + DEBUG2 ("destroying node %p -> %p", node, node->con); if (node->client) { client_destroy(node->client); /* destroys con, parser, refbuf */ @@ -648,6 +650,7 @@ static void *_connection_thread (void *arg) } err = _connection_process (node); if (err > 0) { + DEBUG1 ("AOK, dropping node (%p)", node); free(node); continue; } @@ -658,6 +661,7 @@ static void *_connection_thread (void *arg) _add_connection (node); /* XXX concurency */ break; case EINPROGRESS: /* already handled */ + DEBUG1 ("Node (%p) is handled internaly", node); free(node); break; default: @@ -840,6 +844,7 @@ void connection_accept_loop (void) /* add connection async to the connection queue, then the * connection loop will do all the dirty work */ node =_connection_node_new (con); + INFO1 ("Got new connection %p, adding to queue", node); _add_connection (node); duration = 3000; } @@ -1407,6 +1412,8 @@ int connection_setup_sockets (ice_config_t *config) void connection_close(connection_t *con) { + WARN("destroying con = %p for fd =%d on %d.", con, con->sock, con->serversock); + sock_close(con->sock); if (con->ip) free(con->ip); if (con->host) free(con->host); diff --git a/src/util.c b/src/util.c index c74d1ce..95e85c4 100644 --- a/src/util.c +++ b/src/util.c @@ -156,7 +156,7 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) } if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) { - INFO("util_timed_wait_for_fd <= 0"); + INFO("util_timed_wait_for_fd <= 0 on (%p)", con); return -EAGAIN; } @@ -185,7 +185,7 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) refbuf->sync_point = 0; return endpos; } - INFO ("missing client data, come back for more"); + INFO ("missing client data, on (%p) come back for more", con); refbuf->sync_point = endpos; return endpos; } @@ -197,11 +197,11 @@ int util_read_header(connection_t *con, refbuf_t *refbuf, int flags) return -ENOMEM; } - if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) { - INFO ("util_timed_wait_for_fd <= 0"); + if (util_timed_wait_for_fd(con->sock, header_timeout) <= 0) { + INFO ("util_timed_wait_for_fd <= 0"); refbuf->sync_point = -pos; - return -EAGAIN; - } + return -EAGAIN; + } if (time(NULL) > con->con_timeout) goto out_FAIL; -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 29/31] connection: client_setup, send 403 when we can for error
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 21 ++++++++------------- 1 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/connection.c b/src/connection.c index 8d2d906..f1f8a9c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -691,25 +691,20 @@ static int connection_client_setup (connection_queue_t *node) { goto out_fail; } - err = -EINVAL; if (sock_set_blocking (node->con->sock, 0) || sock_set_nodelay (node->con->sock)) { - WARN0 ("failed to set tcp options on client connection, dropping"); - goto out_destroy_client; - } - -/* XXX(xaiki): this should be 1, but actually, it's buggy, the client is already up and all.. */ - err = -ENOENT; - if (node->con->con_timeout <= time(NULL)) { - WARN("there might be a bug if you see this"); - goto out_destroy_client; + if (! sock_recoverable(sock_error())) { + node->con->error = 1; + err = -EINVAL; + goto out_fail; + } + err = -EINPROGRESS; + client_send_403 (node->client, "failed to set tcp options on client connection, dropping"); + goto out_fail; } - global_unlock(); return 0; -out_destroy_client: - client_destroy (node->client); out_fail: global_unlock(); return err; -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 30/31] connection: _close set everything to NULL on the way out
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/connection.c | 6 +++--- 1 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connection.c b/src/connection.c index f1f8a9c..ff694af 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1410,10 +1410,10 @@ void connection_close(connection_t *con) WARN("destroying con = %p for fd =%d on %d.", con, con->sock, con->serversock); sock_close(con->sock); - if (con->ip) free(con->ip); - if (con->host) free(con->host); + if (con->ip) { free(con->ip); con->ip = NULL; }; + if (con->host) { free(con->host); con->host = NULL; }; #ifdef HAVE_OPENSSL - if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); } + if (con->ssl) { SSL_shutdown (con->ssl); SSL_free (con->ssl); con->ssl = NULL; }; #endif free(con); } -- 1.7.1
Niv Sardi
2010-Jul-30 14:54 UTC
[Icecast-dev] [PATCH 31/31] source: make get_next_buffer try only 10 times and then bail out
Signed-off-by: Niv Sardi <nsardi at smartjog.com> --- src/source.c | 6 ++++++ 1 files changed, 6 insertions(+), 0 deletions(-) diff --git a/src/source.c b/src/source.c index fa99180..796c548 100644 --- a/src/source.c +++ b/src/source.c @@ -457,6 +457,7 @@ static refbuf_t *get_next_buffer (source_t *source) { refbuf_t *refbuf = NULL; int delay = 250; + int try = 0; if (source->short_delay) delay = 0; @@ -465,6 +466,11 @@ static refbuf_t *get_next_buffer (source_t *source) int fds = 0; time_t current = time (NULL); + if (try++ > 10) { + WARN ("Error while waiting on socket (too many trys (%d)), Disconnecting source", try); + source->running = 0; + } + if (source->client) fds = util_timed_wait_for_fd (source->con->sock, delay); else -- 1.7.1
Hi Niv, Can you file bugs and attach the patches to them in our bugtracking system? http://trac.xiph.org/ Mail dumps of patches are pretty much guaranteed to not get merged.
Le vendredi 30 juillet 2010 12:25:48, Michael Smith a ?crit :> All that said: Icecast2 is largely unmaintained these days - I don't > know if anyone is interested in going through these and figuring out > which ones are mergeable, which need fixing, and which shouldn't be > used at all.Maybe its time to find include new contributors ? If no one has time to review the patch, perhaps giving Niv Sardi a commit access is the right answer ? Not necessarily for an immediate stable release, but, hey, commiting them somewhere would help users testing, reporting, etc.. I do not know if Niv Sardi is interested and can have a mid/long term commitement to maintaining this code but I am sure at least his company would be commited to this and benefit from the exposition that a commit upstream can bring in terms of report and bugfix.. Otherwise, facing such interesting and important changes and interested users, the other alternative, eventually, is a fork.. Romain