From 1cf36baadf6bc840327ab270f3510ab0308b0b31 Mon Sep 17 00:00:00 2001 From: BlackLight Date: Tue, 23 Nov 2010 18:42:20 +0100 Subject: [PATCH] Fixing db, mutex and multithreading small bugs --- alert_history.c | 7 ++ alert_parser.c | 35 ++++----- cluster.c | 18 +++-- correlation.c | 18 +++-- db.c | 28 +++++-- mysql.c | 60 ++++++++------- neural.c | 18 ++++- outdb.c | 198 ++++++++++++++++++++++++++++-------------------- postgresql.c | 14 ++++ spp_ai.c | 1 + spp_ai.h | 6 +- stream.c | 4 + 12 files changed, 251 insertions(+), 156 deletions(-) diff --git a/alert_history.c b/alert_history.c index e5ee496..bd41d04 100644 --- a/alert_history.c +++ b/alert_history.c @@ -107,13 +107,18 @@ AI_deserialize_alerts () if ( j == 0 ) { if ( !( event_list = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event )))) + { AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); + } memset ( event_list, 0, sizeof ( AI_alert_event )); event_iterator = event_list; } else { if ( !( event_iterator = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event )))) + { AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); + } + memset ( event_iterator, 0, sizeof ( AI_alert_event )); } @@ -169,7 +174,9 @@ AI_serialize_alerts ( AI_snort_alert **alerts_pool, unsigned int alerts_pool_cou for ( i=0; i < alerts_pool_count; i++ ) { if ( !( event = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event )))) + { AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); + } memset ( event, 0, sizeof ( AI_alert_event )); key.gid = alerts_pool[i]->gid; diff --git a/alert_parser.c b/alert_parser.c index 8c603b9..70cc1bf 100644 --- a/alert_parser.c +++ b/alert_parser.c @@ -74,7 +74,7 @@ AI_serializer_thread ( void *arg ) pthread_mutex_unlock ( &alerts_pool_mutex ); } - pthread_exit ((void*) 0); + /* pthread_exit ((void*) 0); */ return (void*) 0; } /* ----- end of function AI_serializer_thread ----- */ @@ -95,10 +95,12 @@ AI_alerts_pool_thread ( void *arg ) if ( !alerts_pool || alerts_pool_count == 0 ) continue; - if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, NULL ) != 0 ) - { - AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); - } + AI_serializer_thread((void*) 0); + + /* if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, NULL ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); */ + /* } */ /* if ( pthread_join ( serializer_thread, NULL ) != 0 ) */ /* { */ @@ -293,22 +295,21 @@ AI_file_alertparser_thread ( void* arg ) tmp->next = alert; } - if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, alert ) != 0 ) - { - AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); - } + AI_serializer_thread ((void*) alert); - if ( pthread_join ( serializer_thread, NULL ) != 0 ) - { - AI_fatal_err ( "Failed to join the alerts' serializer thread", __FILE__, __LINE__ ); - } + /* if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, alert ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); */ + /* } */ + + /* if ( pthread_join ( serializer_thread, NULL ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Failed to join the alerts' serializer thread", __FILE__, __LINE__ ); */ + /* } */ if ( config->outdbtype != outdb_none ) { - if ( pthread_create ( &db_thread, NULL, AI_store_alert_to_db_thread, alert ) != 0 ) - { - AI_fatal_err ( "Failed to create the alert to db storing thread", __FILE__, __LINE__ ); - } + AI_store_alert_to_db ( alert ); } in_alert = false; diff --git a/cluster.c b/cluster.c index cfa5eed..796262e 100644 --- a/cluster.c +++ b/cluster.c @@ -289,15 +289,17 @@ __AI_merge_alerts ( AI_snort_alert **log ) alerts_couple->alert1 = tmp; alerts_couple->alert2 = tmp2->next; - if ( pthread_create ( &db_thread, NULL, AI_store_cluster_to_db_thread, alerts_couple ) != 0 ) - { - AI_fatal_err ( "Failed to create the cluster-to-database thread", __FILE__, __LINE__ ); - } + AI_store_cluster_to_db ( alerts_couple ); - if ( pthread_join ( db_thread, NULL ) != 0 ) - { - AI_fatal_err ( "Could not join the cluster-to-database thread", __FILE__, __LINE__ ); - } + /* if ( pthread_create ( &db_thread, NULL, AI_store_cluster_to_db_thread, alerts_couple ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Failed to create the cluster-to-database thread", __FILE__, __LINE__ ); */ + /* } */ + + /* if ( pthread_join ( db_thread, NULL ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Could not join the cluster-to-database thread", __FILE__, __LINE__ ); */ + /* } */ } /* Merge the two alerts */ diff --git a/correlation.c b/correlation.c index 42e5285..7b72097 100644 --- a/correlation.c +++ b/correlation.c @@ -1468,15 +1468,17 @@ AI_alert_correlation_thread ( void *arg ) if ( config->outdbtype != outdb_none ) { - if ( pthread_create ( &db_thread, NULL, AI_store_correlation_to_db_thread, corr ) != 0 ) - { - AI_fatal_err ( "Failed to create the correlation-to-database storing thread", __FILE__, __LINE__ ); - } + AI_store_correlation_to_db ( corr ); - if ( pthread_join ( db_thread, NULL ) != 0 ) - { - AI_fatal_err ( "Failed to join the correlation-to-database storing thread", __FILE__, __LINE__ ); - } + /* if ( pthread_create ( &db_thread, NULL, AI_store_correlation_to_db_thread, corr ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Failed to create the correlation-to-database storing thread", __FILE__, __LINE__ ); */ + /* } */ + + /* if ( pthread_join ( db_thread, NULL ) != 0 ) */ + /* { */ + /* AI_fatal_err ( "Failed to join the correlation-to-database storing thread", __FILE__, __LINE__ ); */ + /* } */ } } } diff --git a/db.c b/db.c index 817d179..b1b809f 100644 --- a/db.c +++ b/db.c @@ -78,24 +78,27 @@ AI_db_alertparser_thread ( void *arg ) while ( 1 ) { sleep ( config->databaseParsingInterval ); - pthread_mutex_lock ( &mutex ); memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof (query), "select cid, unix_timestamp(timestamp), signature from event where cid > %d " "and unix_timestamp(timestamp) > %ld order by cid", latest_cid, latest_time ); + pthread_mutex_lock ( &mutex ); + if ( !( res = (DB_result) DB_query ( query ))) { + pthread_mutex_unlock ( &mutex ); DB_close(); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); } + pthread_mutex_unlock ( &mutex ); + if (( rows = DB_num_rows ( res )) < 0 ) { DB_close(); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); } else if ( rows == 0 ) { - pthread_mutex_unlock ( &mutex ); continue; } @@ -115,12 +118,17 @@ AI_db_alertparser_thread ( void *arg ) snprintf ( query, sizeof ( query ), "select sig_gid, sig_sid, sig_rev, sig_name, sig_priority from signature " "where sig_id='%ld'", strtol ( row[2], NULL, 0 )); + pthread_mutex_lock ( &mutex ); + if ( !( res2 = (DB_result) DB_query ( query ))) { + pthread_mutex_unlock ( &mutex ); DB_close(); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); } + pthread_mutex_unlock ( &mutex ); + if (( rows = DB_num_rows ( res2 )) < 0 ) { DB_close(); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); @@ -142,12 +150,17 @@ AI_db_alertparser_thread ( void *arg ) snprintf ( query, sizeof ( query ), "select ip_tos, ip_len, ip_id, ip_ttl, ip_proto, ip_src, ip_dst " "from iphdr where cid='%d'", latest_cid); + pthread_mutex_lock ( &mutex ); + if ( !( res2 = (DB_result) DB_query ( query ))) { + pthread_mutex_unlock ( &mutex ); DB_close(); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); } + pthread_mutex_unlock ( &mutex ); + if (( rows = DB_num_rows ( res2 )) < 0 ) { DB_close(); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); @@ -171,12 +184,17 @@ AI_db_alertparser_thread ( void *arg ) snprintf ( query, sizeof ( query ), "select tcp_sport, tcp_dport, tcp_seq, tcp_ack, tcp_flags, tcp_win " "from tcphdr where cid='%d'", latest_cid ); + pthread_mutex_lock ( &mutex ); + if ( !( res2 = (DB_result) DB_query ( query ))) { + pthread_mutex_unlock ( &mutex ); DB_close(); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); } + pthread_mutex_unlock ( &mutex ); + if (( rows = DB_num_rows ( res2 )) < 0 ) { DB_close(); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); @@ -218,14 +236,10 @@ AI_db_alertparser_thread ( void *arg ) } } - pthread_mutex_unlock ( &mutex ); DB_free_result ( res ); latest_time = time ( NULL ); - if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, alert ) != 0 ) - { - AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); - } + AI_serializer_thread ((void*) alert); } DB_close(); diff --git a/mysql.c b/mysql.c index c7cbdb0..4947391 100644 --- a/mysql.c +++ b/mysql.c @@ -79,20 +79,6 @@ __mysql_do_init ( MYSQL **__DB, BOOL is_out ) return (void*) *__DB; } -PRIVATE MYSQL_RES* -__mysql_do_query ( MYSQL *__DB, const char *query ) -{ - MYSQL_RES *res = NULL; - - if ( mysql_query ( __DB, query )) - return NULL; - - if ( !( res = mysql_store_result ( __DB ))) - return NULL; - - return res; -} - PRIVATE void __mysql_do_close ( MYSQL **__DB ) { @@ -103,6 +89,22 @@ __mysql_do_close ( MYSQL **__DB ) *__DB = NULL; } +PRIVATE MYSQL_RES* +__mysql_do_query ( MYSQL *__DB, const char *query ) +{ + MYSQL_RES *res = NULL; + + if ( mysql_query ( __DB, query )) + { + return NULL; + } + + if ( !( res = mysql_store_result ( __DB ))) + return NULL; + + return res; +} + /* End of private functions */ /****************************/ @@ -121,9 +123,18 @@ mysql_do_init () return __mysql_do_init ( &db, false ); } +BOOL +mysql_is_gone () +{ + return (( mysql_errno ( db ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( db ) == CR_SERVER_LOST )); +} + MYSQL_RES* mysql_do_query ( const char *query ) { + if ( !db ) + mysql_do_init(); + return __mysql_do_query ( db, query ); } @@ -145,12 +156,6 @@ mysql_do_error () return mysql_error ( db ); } -BOOL -mysql_is_gone () -{ - return (( mysql_errno ( db ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( db ) == CR_SERVER_LOST )); -} - void mysql_do_close () { @@ -171,9 +176,18 @@ mysql_do_out_init () return __mysql_do_init ( &outdb, true ); } +BOOL +mysql_is_out_gone () +{ + return (( mysql_errno ( outdb ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( outdb ) == CR_SERVER_LOST )); +} + MYSQL_RES* mysql_do_out_query ( const char *query ) { + if ( !outdb ) + mysql_do_out_init(); + return __mysql_do_query ( outdb, query ); } @@ -195,12 +209,6 @@ mysql_do_out_error () return mysql_error ( outdb ); } -BOOL -mysql_is_out_gone () -{ - return (( mysql_errno ( outdb ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( outdb ) == CR_SERVER_LOST )); -} - void mysql_do_out_close () { diff --git a/neural.c b/neural.c index 936dcc1..32f9d89 100644 --- a/neural.c +++ b/neural.c @@ -67,7 +67,6 @@ AI_neural_correlation_weight () double x = 0, k = (double) config->alert_correlation_weight / HYPERBOLIC_TANGENT_SOLUTION; - snprintf ( query, sizeof ( query ), "SELECT count(*) FROM %s", outdb_config[ALERTS_TABLE] ); pthread_mutex_lock ( &outdb_mutex ); if ( !DB_out_init() ) @@ -76,16 +75,23 @@ AI_neural_correlation_weight () AI_fatal_err ( "Unable to connect to the database specified in module configuration", __FILE__, __LINE__ ); } + pthread_mutex_unlock ( &outdb_mutex ); + + snprintf ( query, sizeof ( query ), "SELECT count(*) FROM %s", outdb_config[ALERTS_TABLE] ); + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { + _dpd.errMsg ( "Warning: Database error while executing the query '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); - return 0; + return 0.0; } + pthread_mutex_unlock ( &outdb_mutex ); + row = (DB_row) DB_fetch_row ( res ); x = strtod ( row[0], NULL ); DB_free_result ( res ); - pthread_mutex_unlock ( &outdb_mutex ); return (( exp(x/k) - exp(-x/k) ) / ( exp(x/k) + exp(-x/k) )); } /* ----- end of function AI_neural_correlation_weight ----- */ @@ -331,6 +337,8 @@ __AI_som_train () AI_fatal_err ( "Unable to connect to the database specified in module configuration", __FILE__, __LINE__ ); } + pthread_mutex_unlock ( &outdb_mutex ); + #ifdef HAVE_LIBMYSQLCLIENT snprintf ( query, sizeof ( query ), "SELECT gid, sid, rev, unix_timestamp(timestamp), ip_src_addr, ip_dst_addr, tcp_src_port, tcp_dst_port " @@ -351,8 +359,11 @@ __AI_som_train () ); #endif + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { + _dpd.errMsg ( "Warning: Database error while executing the query '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); return; } @@ -405,6 +416,7 @@ __AI_som_train () { if ( !( net = som_network_new ( SOM_NUM_ITEMS, config->outputNeuronsPerSide, config->outputNeuronsPerSide ))) { + pthread_mutex_unlock ( &neural_mutex ); AI_fatal_err ( "AIPreproc: Could not create the neural network", __FILE__, __LINE__ ); } diff --git a/outdb.c b/outdb.c index b8455b7..8fca0ea 100644 --- a/outdb.c +++ b/outdb.c @@ -54,14 +54,14 @@ AI_outdb_mutex_initialize () } /* ----- end of function AI_outdb_mutex_initialize ----- */ /** - * \brief Thread for storing an alert to the database - * \param arg Alert to be stored + * \brief Store an alert to the database + * \param alert Alert to be stored */ -void* -AI_store_alert_to_db_thread ( void *arg ) +void +AI_store_alert_to_db ( AI_snort_alert *alert ) { - char query[65535] = { 0 }, + char query[32768] = { 0 }, iphdr_id_str[20] = { 0 }, tcphdr_id_str[20] = { 0 }, srcip[INET_ADDRSTRLEN], @@ -77,12 +77,16 @@ AI_store_alert_to_db_thread ( void *arg ) struct pkt_info *pkt = NULL; DB_result res; DB_row row; - AI_snort_alert *alert = (AI_snort_alert*) arg; pthread_mutex_lock ( &outdb_mutex ); if ( !DB_out_init() ) + { + pthread_mutex_unlock ( &outdb_mutex ); AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ ); + } + + pthread_mutex_unlock ( &outdb_mutex ); inet_ntop ( AF_INET, &(alert->ip_src_addr), srcip, INET_ADDRSTRLEN ); inet_ntop ( AF_INET, &(alert->ip_dst_addr), dstip, INET_ADDRSTRLEN ); @@ -100,24 +104,27 @@ AI_store_alert_to_db_thread ( void *arg ) srcip, dstip ); + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof ( query ), "SELECT MAX(ip_hdr_id) FROM %s", outdb_config[IPV4_HEADERS_TABLE] ); + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } + pthread_mutex_unlock ( &outdb_mutex ); + if ( !( row = (DB_row) DB_fetch_row ( res ))) { - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } latest_ip_hdr_id = strtoul ( row[0], NULL, 10 ); @@ -138,24 +145,27 @@ AI_store_alert_to_db_thread ( void *arg ) ntohs (alert->tcp_window ), ntohs (alert->tcp_len )); + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof ( query ), "SELECT MAX(tcp_hdr_id) FROM %s", outdb_config[TCP_HEADERS_TABLE] ); + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } + pthread_mutex_unlock ( &outdb_mutex ); + if ( !( row = (DB_row) DB_fetch_row ( res ))) { - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } latest_tcp_hdr_id = strtoul ( row[0], NULL, 10 ); @@ -206,24 +216,27 @@ AI_store_alert_to_db_thread ( void *arg ) tcphdr_id_str ); #endif + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof ( query ), "SELECT MAX(alert_id) FROM %s", outdb_config[ALERTS_TABLE] ); + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } + pthread_mutex_unlock ( &outdb_mutex ); + if ( !( row = (DB_row) DB_fetch_row ( res ))) { - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } latest_alert_id = strtoul ( row[0], NULL, 10 ); @@ -250,47 +263,56 @@ AI_store_alert_to_db_thread ( void *arg ) if ( !( pkt_data = (unsigned char*) alloca ( 2 * ( pkt_size ) + 1 ))) AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); - DB_out_escape_string ( - (char**) &pkt_data, - (const char*) pkt->pkt->pkt_data, - pkt_size ); + if ( pkt->pkt ) + { + if ( pkt->pkt->pkt_data ) + { + if ( strlen ((const char*) pkt->pkt->pkt_data ) != 0 ) + { + DB_out_escape_string ( + (char**) &pkt_data, + (const char*) pkt->pkt->pkt_data, + pkt_size ); - memset ( query, 0, sizeof ( query )); + memset ( query, 0, sizeof ( query )); - #ifdef HAVE_LIBMYSQLCLIENT - snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) " - "VALUES (%lu, %u, from_unixtime('%lu'), '%s')", - outdb_config[PACKET_STREAMS_TABLE], - latest_alert_id, - pkt->pkt->pcap_header->len + pkt->pkt->payload_size, - pkt->timestamp, - pkt_data ); - #elif HAVE_LIBPQ - snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) " - "VALUES (%lu, %u, timestamp with time zone 'epoch' + %lu * interval '1 second', '%s')", - outdb_config[PACKET_STREAMS_TABLE], - latest_alert_id, - pkt->pkt->pcap_header->len + pkt->pkt->payload_size, - pkt->timestamp, - pkt_data ); - #endif + #ifdef HAVE_LIBMYSQLCLIENT + snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) " + "VALUES (%lu, %u, from_unixtime('%lu'), '%s')", + outdb_config[PACKET_STREAMS_TABLE], + latest_alert_id, + pkt->pkt->pcap_header->len + pkt->pkt->payload_size, + pkt->timestamp, + pkt_data ); + #elif HAVE_LIBPQ + snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) " + "VALUES (%lu, %u, timestamp with time zone 'epoch' + %lu * interval '1 second', '%s')", + outdb_config[PACKET_STREAMS_TABLE], + latest_alert_id, + pkt->pkt->pcap_header->len + pkt->pkt->payload_size, + pkt->timestamp, + pkt_data ); + #endif - DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_lock ( &outdb_mutex ); + DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); + } + } + } } } - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; -} /* ----- end of function AI_store_alert_to_db_thread ----- */ + return; +} /* ----- end of function AI_store_alert_to_db ----- */ /** * \brief Store an alert cluster to database - * \param arg Struct pointer containing the couple of alerts to be clustered together + * \param alerts_couple Struct pointer containing the couple of alerts to be clustered together */ -void* -AI_store_cluster_to_db_thread ( void *arg ) +void +AI_store_cluster_to_db ( AI_alerts_couple *alerts_couple ) { int i; unsigned long cluster1 = 0, @@ -303,35 +325,35 @@ AI_store_cluster_to_db_thread ( void *arg ) srcport[10] = { 0 }, dstport[10] = { 0 }; - AI_alerts_couple *alerts_couple = (AI_alerts_couple*) arg; AI_couples_cache *found = NULL; DB_result res; DB_row row; BOOL new_cluster = false; - pthread_mutex_lock ( &outdb_mutex ); - /* Check if the couple of alerts is already in our cache, so it already * belongs to the same cluster. If so, just return */ HASH_FIND ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found ); if ( found ) { - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } /* Initialize the database (it just does nothing if it is already initialized) */ + pthread_mutex_lock ( &outdb_mutex ); + if ( !DB_out_init() ) + { + pthread_mutex_unlock ( &outdb_mutex ); AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ ); + } + + pthread_mutex_unlock ( &outdb_mutex ); /* If one of the two alerts has no alert_id, simply return */ if ( !alerts_couple->alert1->alert_id || !alerts_couple->alert2->alert_id ) { - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } /* Check if there already exist a cluster containing one of them */ @@ -340,14 +362,16 @@ AI_store_cluster_to_db_thread ( void *arg ) "SELECT cluster_id FROM %s WHERE alert_id=%lu OR alert_id=%lu", outdb_config[ALERTS_TABLE], alerts_couple->alert1->alert_id, alerts_couple->alert2->alert_id ); + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } + pthread_mutex_unlock ( &outdb_mutex ); new_cluster = true; for ( i=0; (row = (DB_row) DB_fetch_row ( res )); i++ ) @@ -379,9 +403,7 @@ AI_store_cluster_to_db_thread ( void *arg ) found->alerts_couple = alerts_couple; found->cluster_id = cluster1; HASH_ADD ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found ); - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } if ( new_cluster ) @@ -403,25 +425,28 @@ AI_store_cluster_to_db_thread ( void *arg ) ((alerts_couple->alert1->h_node[dst_port]) ? alerts_couple->alert1->h_node[dst_port]->label : dstport) ); + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof ( query ), "SELECT MAX(cluster_id) FROM %s", outdb_config[CLUSTERED_ALERTS_TABLE] ); + pthread_mutex_lock ( &outdb_mutex ); + if ( !( res = (DB_result) DB_out_query ( query ))) { _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } + pthread_mutex_unlock ( &outdb_mutex ); + if ( !( row = (DB_row) DB_fetch_row ( res ))) { - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; + return; } latest_cluster_id = strtoul ( row[0], NULL, 10 ); @@ -434,7 +459,9 @@ AI_store_cluster_to_db_thread ( void *arg ) outdb_config[ALERTS_TABLE], latest_cluster_id, alerts_couple->alert1->alert_id, alerts_couple->alert2->alert_id ); + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); } else { /* Update the alert marked as 'not clustered' */ if ( !cluster1 ) @@ -444,14 +471,18 @@ AI_store_cluster_to_db_thread ( void *arg ) "UPDATE %s SET cluster_id=%lu WHERE alert_id=%lu", outdb_config[ALERTS_TABLE], cluster2, alerts_couple->alert1->alert_id ); + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); } else { memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof ( query ), "UPDATE %s SET cluster_id=%lu WHERE alert_id=%lu", outdb_config[ALERTS_TABLE], cluster1, alerts_couple->alert2->alert_id ); + pthread_mutex_lock ( &outdb_mutex ); DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_unlock ( &outdb_mutex ); } } @@ -462,11 +493,7 @@ AI_store_cluster_to_db_thread ( void *arg ) found->alerts_couple = alerts_couple; found->cluster_id = cluster1; HASH_ADD ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found ); - - pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return (void*) 0; -} /* ----- end of function AI_store_cluster_to_db_thread ----- */ +} /* ----- end of function AI_store_cluster_to_db ----- */ /** @@ -474,17 +501,21 @@ AI_store_cluster_to_db_thread ( void *arg ) * \param arg Structure containing the two alerts to be saved and their correlation */ -void* -AI_store_correlation_to_db_thread ( void *arg ) +void +AI_store_correlation_to_db ( AI_alert_correlation *corr ) { char query[1024] = { 0 }; - AI_alert_correlation *corr = (AI_alert_correlation*) arg; - - pthread_mutex_lock ( &outdb_mutex ); /* Initialize the database (it just does nothing if it is already initialized) */ + pthread_mutex_lock ( &outdb_mutex ); + if ( !DB_out_init() ) + { + pthread_mutex_unlock ( &outdb_mutex ); AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ ); + } + + pthread_mutex_unlock ( &outdb_mutex ); memset ( query, 0, sizeof ( query )); snprintf ( query, sizeof ( query ), @@ -494,12 +525,11 @@ AI_store_correlation_to_db_thread ( void *arg ) corr->key.a->alert_id, corr->key.b->alert_id, corr->correlation ); - DB_free_result ((DB_result) DB_out_query ( query )); + pthread_mutex_lock ( &outdb_mutex ); + DB_free_result ((DB_result) DB_out_query ( query )); pthread_mutex_unlock ( &outdb_mutex ); - pthread_exit ((void*) 0); - return 0; -} /* ----- end of function AI_store_correlation_to_db_thread ----- */ +} /* ----- end of function AI_store_correlation_to_db ----- */ #endif diff --git a/postgresql.c b/postgresql.c index 322cee5..9150359 100644 --- a/postgresql.c +++ b/postgresql.c @@ -200,6 +200,13 @@ unsigned long postgresql_do_escape_string ( char **to, const char *from, unsigned long length ) { size_t out_len = 0; + + if ( !from ) + return 0; + + if ( strlen ( from ) == 0 ) + return 0; + *to = (char*) PQescapeByteaConn ( db, (const unsigned char* ) from, (size_t) length, &out_len ); return (unsigned long) out_len; } @@ -234,6 +241,13 @@ unsigned long postgresql_do_out_escape_string ( char **to, const char *from, unsigned long length ) { size_t out_len = 0; + + if ( !from ) + return 0; + + if ( strlen ( from ) == 0 ) + return 0; + *to = (char*) PQescapeByteaConn ( outdb, (const unsigned char* ) from, (size_t) length, &out_len ); return (unsigned long) out_len; } diff --git a/spp_ai.c b/spp_ai.c index 9935e09..7d0f2ef 100644 --- a/spp_ai.c +++ b/spp_ai.c @@ -157,6 +157,7 @@ static void AI_init(char *args) AI_fatal_err ( "Failed to create the neural network thread", __FILE__, __LINE__ ); } } + /* Register the preprocessor function, Transport layer, ID 10000 */ _dpd.addPreproc(AI_process, PRIORITY_TRANSPORT, 10000, PROTO_BIT__TCP | PROTO_BIT__UDP); DEBUG_WRAP(_dpd.debugMsg(DEBUG_PLUGIN, "Preprocessor: AI is initialized\n");); diff --git a/spp_ai.h b/spp_ai.h index 1b0d6db..6c8ee86 100644 --- a/spp_ai.h +++ b/spp_ai.h @@ -552,9 +552,9 @@ double AI_neural_correlation_weight (); double AI_bayesian_correlation_weight (); void AI_outdb_mutex_initialize (); -void* AI_store_alert_to_db_thread ( void* ); -void* AI_store_cluster_to_db_thread ( void* ); -void* AI_store_correlation_to_db_thread ( void* ); +void AI_store_alert_to_db ( AI_snort_alert* ); +void AI_store_cluster_to_db ( AI_alerts_couple* ); +void AI_store_correlation_to_db ( AI_alert_correlation* ); void* AI_neural_clustering_thread ( void* ); AI_alerts_per_neuron* AI_get_alerts_per_neuron (); diff --git a/stream.c b/stream.c index a1f44e8..7c899b8 100644 --- a/stream.c +++ b/stream.c @@ -208,6 +208,10 @@ AI_pkt_enqueue ( SFSnortPacket* pkt ) tmp = NULL; for ( ; found->next; found = found->next ) { + /* Stupid memory bug fixed in a stupid and unelegant way */ + if ( (int) found->next->pkt < 0x100 ) + break; + /* If the sequence number of the next packet in the stream * is bigger than the sequence number of the current packet, * place the current packet before that */