Fixing db, mutex and multithreading small bugs

This commit is contained in:
BlackLight 2010-11-23 18:42:20 +01:00
parent cbee4cb9fa
commit 1cf36baadf
12 changed files with 251 additions and 156 deletions

View file

@ -107,13 +107,18 @@ AI_deserialize_alerts ()
if ( j == 0 ) if ( j == 0 )
{ {
if ( !( event_list = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event )))) if ( !( event_list = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event ))))
{
AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ );
}
memset ( event_list, 0, sizeof ( AI_alert_event )); memset ( event_list, 0, sizeof ( AI_alert_event ));
event_iterator = event_list; event_iterator = event_list;
} else { } else {
if ( !( event_iterator = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event )))) if ( !( event_iterator = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event ))))
{
AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ );
}
memset ( event_iterator, 0, sizeof ( AI_alert_event )); memset ( event_iterator, 0, sizeof ( AI_alert_event ));
} }
@ -169,7 +174,9 @@ AI_serialize_alerts ( AI_snort_alert **alerts_pool, unsigned int alerts_pool_cou
for ( i=0; i < alerts_pool_count; i++ ) for ( i=0; i < alerts_pool_count; i++ )
{ {
if ( !( event = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event )))) if ( !( event = ( AI_alert_event* ) malloc ( sizeof ( AI_alert_event ))))
{
AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ );
}
memset ( event, 0, sizeof ( AI_alert_event )); memset ( event, 0, sizeof ( AI_alert_event ));
key.gid = alerts_pool[i]->gid; key.gid = alerts_pool[i]->gid;

View file

@ -74,7 +74,7 @@ AI_serializer_thread ( void *arg )
pthread_mutex_unlock ( &alerts_pool_mutex ); pthread_mutex_unlock ( &alerts_pool_mutex );
} }
pthread_exit ((void*) 0); /* pthread_exit ((void*) 0); */
return (void*) 0; return (void*) 0;
} /* ----- end of function AI_serializer_thread ----- */ } /* ----- end of function AI_serializer_thread ----- */
@ -95,10 +95,12 @@ AI_alerts_pool_thread ( void *arg )
if ( !alerts_pool || alerts_pool_count == 0 ) if ( !alerts_pool || alerts_pool_count == 0 )
continue; continue;
if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, NULL ) != 0 ) AI_serializer_thread((void*) 0);
{
AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); /* if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, NULL ) != 0 ) */
} /* { */
/* AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); */
/* } */
/* if ( pthread_join ( serializer_thread, NULL ) != 0 ) */ /* if ( pthread_join ( serializer_thread, NULL ) != 0 ) */
/* { */ /* { */
@ -293,22 +295,21 @@ AI_file_alertparser_thread ( void* arg )
tmp->next = alert; tmp->next = alert;
} }
if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, alert ) != 0 ) AI_serializer_thread ((void*) alert);
{
AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ );
}
if ( pthread_join ( serializer_thread, NULL ) != 0 ) /* if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, alert ) != 0 ) */
{ /* { */
AI_fatal_err ( "Failed to join the alerts' serializer thread", __FILE__, __LINE__ ); /* AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ ); */
} /* } */
/* if ( pthread_join ( serializer_thread, NULL ) != 0 ) */
/* { */
/* AI_fatal_err ( "Failed to join the alerts' serializer thread", __FILE__, __LINE__ ); */
/* } */
if ( config->outdbtype != outdb_none ) if ( config->outdbtype != outdb_none )
{ {
if ( pthread_create ( &db_thread, NULL, AI_store_alert_to_db_thread, alert ) != 0 ) AI_store_alert_to_db ( alert );
{
AI_fatal_err ( "Failed to create the alert to db storing thread", __FILE__, __LINE__ );
}
} }
in_alert = false; in_alert = false;

View file

@ -289,15 +289,17 @@ __AI_merge_alerts ( AI_snort_alert **log )
alerts_couple->alert1 = tmp; alerts_couple->alert1 = tmp;
alerts_couple->alert2 = tmp2->next; alerts_couple->alert2 = tmp2->next;
if ( pthread_create ( &db_thread, NULL, AI_store_cluster_to_db_thread, alerts_couple ) != 0 ) AI_store_cluster_to_db ( alerts_couple );
{
AI_fatal_err ( "Failed to create the cluster-to-database thread", __FILE__, __LINE__ );
}
if ( pthread_join ( db_thread, NULL ) != 0 ) /* if ( pthread_create ( &db_thread, NULL, AI_store_cluster_to_db_thread, alerts_couple ) != 0 ) */
{ /* { */
AI_fatal_err ( "Could not join the cluster-to-database thread", __FILE__, __LINE__ ); /* AI_fatal_err ( "Failed to create the cluster-to-database thread", __FILE__, __LINE__ ); */
} /* } */
/* if ( pthread_join ( db_thread, NULL ) != 0 ) */
/* { */
/* AI_fatal_err ( "Could not join the cluster-to-database thread", __FILE__, __LINE__ ); */
/* } */
} }
/* Merge the two alerts */ /* Merge the two alerts */

View file

@ -1468,15 +1468,17 @@ AI_alert_correlation_thread ( void *arg )
if ( config->outdbtype != outdb_none ) if ( config->outdbtype != outdb_none )
{ {
if ( pthread_create ( &db_thread, NULL, AI_store_correlation_to_db_thread, corr ) != 0 ) AI_store_correlation_to_db ( corr );
{
AI_fatal_err ( "Failed to create the correlation-to-database storing thread", __FILE__, __LINE__ );
}
if ( pthread_join ( db_thread, NULL ) != 0 ) /* if ( pthread_create ( &db_thread, NULL, AI_store_correlation_to_db_thread, corr ) != 0 ) */
{ /* { */
AI_fatal_err ( "Failed to join the correlation-to-database storing thread", __FILE__, __LINE__ ); /* AI_fatal_err ( "Failed to create the correlation-to-database storing thread", __FILE__, __LINE__ ); */
} /* } */
/* if ( pthread_join ( db_thread, NULL ) != 0 ) */
/* { */
/* AI_fatal_err ( "Failed to join the correlation-to-database storing thread", __FILE__, __LINE__ ); */
/* } */
} }
} }
} }

28
db.c
View file

@ -78,24 +78,27 @@ AI_db_alertparser_thread ( void *arg )
while ( 1 ) while ( 1 )
{ {
sleep ( config->databaseParsingInterval ); sleep ( config->databaseParsingInterval );
pthread_mutex_lock ( &mutex );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof (query), "select cid, unix_timestamp(timestamp), signature from event where cid > %d " snprintf ( query, sizeof (query), "select cid, unix_timestamp(timestamp), signature from event where cid > %d "
"and unix_timestamp(timestamp) > %ld order by cid", latest_cid, latest_time ); "and unix_timestamp(timestamp) > %ld order by cid", latest_cid, latest_time );
pthread_mutex_lock ( &mutex );
if ( !( res = (DB_result) DB_query ( query ))) if ( !( res = (DB_result) DB_query ( query )))
{ {
pthread_mutex_unlock ( &mutex );
DB_close(); DB_close();
AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ );
} }
pthread_mutex_unlock ( &mutex );
if (( rows = DB_num_rows ( res )) < 0 ) if (( rows = DB_num_rows ( res )) < 0 )
{ {
DB_close(); DB_close();
AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ );
} else if ( rows == 0 ) { } else if ( rows == 0 ) {
pthread_mutex_unlock ( &mutex );
continue; continue;
} }
@ -115,12 +118,17 @@ AI_db_alertparser_thread ( void *arg )
snprintf ( query, sizeof ( query ), "select sig_gid, sig_sid, sig_rev, sig_name, sig_priority from signature " snprintf ( query, sizeof ( query ), "select sig_gid, sig_sid, sig_rev, sig_name, sig_priority from signature "
"where sig_id='%ld'", strtol ( row[2], NULL, 0 )); "where sig_id='%ld'", strtol ( row[2], NULL, 0 ));
pthread_mutex_lock ( &mutex );
if ( !( res2 = (DB_result) DB_query ( query ))) if ( !( res2 = (DB_result) DB_query ( query )))
{ {
pthread_mutex_unlock ( &mutex );
DB_close(); DB_close();
AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ );
} }
pthread_mutex_unlock ( &mutex );
if (( rows = DB_num_rows ( res2 )) < 0 ) { if (( rows = DB_num_rows ( res2 )) < 0 ) {
DB_close(); DB_close();
AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ );
@ -142,12 +150,17 @@ AI_db_alertparser_thread ( void *arg )
snprintf ( query, sizeof ( query ), "select ip_tos, ip_len, ip_id, ip_ttl, ip_proto, ip_src, ip_dst " snprintf ( query, sizeof ( query ), "select ip_tos, ip_len, ip_id, ip_ttl, ip_proto, ip_src, ip_dst "
"from iphdr where cid='%d'", latest_cid); "from iphdr where cid='%d'", latest_cid);
pthread_mutex_lock ( &mutex );
if ( !( res2 = (DB_result) DB_query ( query ))) if ( !( res2 = (DB_result) DB_query ( query )))
{ {
pthread_mutex_unlock ( &mutex );
DB_close(); DB_close();
AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ );
} }
pthread_mutex_unlock ( &mutex );
if (( rows = DB_num_rows ( res2 )) < 0 ) { if (( rows = DB_num_rows ( res2 )) < 0 ) {
DB_close(); DB_close();
AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ );
@ -171,12 +184,17 @@ AI_db_alertparser_thread ( void *arg )
snprintf ( query, sizeof ( query ), "select tcp_sport, tcp_dport, tcp_seq, tcp_ack, tcp_flags, tcp_win " snprintf ( query, sizeof ( query ), "select tcp_sport, tcp_dport, tcp_seq, tcp_ack, tcp_flags, tcp_win "
"from tcphdr where cid='%d'", latest_cid ); "from tcphdr where cid='%d'", latest_cid );
pthread_mutex_lock ( &mutex );
if ( !( res2 = (DB_result) DB_query ( query ))) if ( !( res2 = (DB_result) DB_query ( query )))
{ {
pthread_mutex_unlock ( &mutex );
DB_close(); DB_close();
AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal error while executing a query on the database", __FILE__, __LINE__ );
} }
pthread_mutex_unlock ( &mutex );
if (( rows = DB_num_rows ( res2 )) < 0 ) { if (( rows = DB_num_rows ( res2 )) < 0 ) {
DB_close(); DB_close();
AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ ); AI_fatal_err ( "Could not store the query result", __FILE__, __LINE__ );
@ -218,14 +236,10 @@ AI_db_alertparser_thread ( void *arg )
} }
} }
pthread_mutex_unlock ( &mutex );
DB_free_result ( res ); DB_free_result ( res );
latest_time = time ( NULL ); latest_time = time ( NULL );
if ( pthread_create ( &serializer_thread, NULL, AI_serializer_thread, alert ) != 0 ) AI_serializer_thread ((void*) alert);
{
AI_fatal_err ( "Failed to create the alerts' serializer thread", __FILE__, __LINE__ );
}
} }
DB_close(); DB_close();

60
mysql.c
View file

@ -79,20 +79,6 @@ __mysql_do_init ( MYSQL **__DB, BOOL is_out )
return (void*) *__DB; return (void*) *__DB;
} }
PRIVATE MYSQL_RES*
__mysql_do_query ( MYSQL *__DB, const char *query )
{
MYSQL_RES *res = NULL;
if ( mysql_query ( __DB, query ))
return NULL;
if ( !( res = mysql_store_result ( __DB )))
return NULL;
return res;
}
PRIVATE void PRIVATE void
__mysql_do_close ( MYSQL **__DB ) __mysql_do_close ( MYSQL **__DB )
{ {
@ -103,6 +89,22 @@ __mysql_do_close ( MYSQL **__DB )
*__DB = NULL; *__DB = NULL;
} }
PRIVATE MYSQL_RES*
__mysql_do_query ( MYSQL *__DB, const char *query )
{
MYSQL_RES *res = NULL;
if ( mysql_query ( __DB, query ))
{
return NULL;
}
if ( !( res = mysql_store_result ( __DB )))
return NULL;
return res;
}
/* End of private functions */ /* End of private functions */
/****************************/ /****************************/
@ -121,9 +123,18 @@ mysql_do_init ()
return __mysql_do_init ( &db, false ); return __mysql_do_init ( &db, false );
} }
BOOL
mysql_is_gone ()
{
return (( mysql_errno ( db ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( db ) == CR_SERVER_LOST ));
}
MYSQL_RES* MYSQL_RES*
mysql_do_query ( const char *query ) mysql_do_query ( const char *query )
{ {
if ( !db )
mysql_do_init();
return __mysql_do_query ( db, query ); return __mysql_do_query ( db, query );
} }
@ -145,12 +156,6 @@ mysql_do_error ()
return mysql_error ( db ); return mysql_error ( db );
} }
BOOL
mysql_is_gone ()
{
return (( mysql_errno ( db ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( db ) == CR_SERVER_LOST ));
}
void void
mysql_do_close () mysql_do_close ()
{ {
@ -171,9 +176,18 @@ mysql_do_out_init ()
return __mysql_do_init ( &outdb, true ); return __mysql_do_init ( &outdb, true );
} }
BOOL
mysql_is_out_gone ()
{
return (( mysql_errno ( outdb ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( outdb ) == CR_SERVER_LOST ));
}
MYSQL_RES* MYSQL_RES*
mysql_do_out_query ( const char *query ) mysql_do_out_query ( const char *query )
{ {
if ( !outdb )
mysql_do_out_init();
return __mysql_do_query ( outdb, query ); return __mysql_do_query ( outdb, query );
} }
@ -195,12 +209,6 @@ mysql_do_out_error ()
return mysql_error ( outdb ); return mysql_error ( outdb );
} }
BOOL
mysql_is_out_gone ()
{
return (( mysql_errno ( outdb ) == CR_SERVER_GONE_ERROR ) || ( mysql_errno ( outdb ) == CR_SERVER_LOST ));
}
void void
mysql_do_out_close () mysql_do_out_close ()
{ {

View file

@ -67,7 +67,6 @@ AI_neural_correlation_weight ()
double x = 0, double x = 0,
k = (double) config->alert_correlation_weight / HYPERBOLIC_TANGENT_SOLUTION; k = (double) config->alert_correlation_weight / HYPERBOLIC_TANGENT_SOLUTION;
snprintf ( query, sizeof ( query ), "SELECT count(*) FROM %s", outdb_config[ALERTS_TABLE] );
pthread_mutex_lock ( &outdb_mutex ); pthread_mutex_lock ( &outdb_mutex );
if ( !DB_out_init() ) if ( !DB_out_init() )
@ -76,16 +75,23 @@ AI_neural_correlation_weight ()
AI_fatal_err ( "Unable to connect to the database specified in module configuration", __FILE__, __LINE__ ); AI_fatal_err ( "Unable to connect to the database specified in module configuration", __FILE__, __LINE__ );
} }
pthread_mutex_unlock ( &outdb_mutex );
snprintf ( query, sizeof ( query ), "SELECT count(*) FROM %s", outdb_config[ALERTS_TABLE] );
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.errMsg ( "Warning: Database error while executing the query '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
return 0; return 0.0;
} }
pthread_mutex_unlock ( &outdb_mutex );
row = (DB_row) DB_fetch_row ( res ); row = (DB_row) DB_fetch_row ( res );
x = strtod ( row[0], NULL ); x = strtod ( row[0], NULL );
DB_free_result ( res ); DB_free_result ( res );
pthread_mutex_unlock ( &outdb_mutex );
return (( exp(x/k) - exp(-x/k) ) / ( exp(x/k) + exp(-x/k) )); return (( exp(x/k) - exp(-x/k) ) / ( exp(x/k) + exp(-x/k) ));
} /* ----- end of function AI_neural_correlation_weight ----- */ } /* ----- end of function AI_neural_correlation_weight ----- */
@ -331,6 +337,8 @@ __AI_som_train ()
AI_fatal_err ( "Unable to connect to the database specified in module configuration", __FILE__, __LINE__ ); AI_fatal_err ( "Unable to connect to the database specified in module configuration", __FILE__, __LINE__ );
} }
pthread_mutex_unlock ( &outdb_mutex );
#ifdef HAVE_LIBMYSQLCLIENT #ifdef HAVE_LIBMYSQLCLIENT
snprintf ( query, sizeof ( query ), snprintf ( query, sizeof ( query ),
"SELECT gid, sid, rev, unix_timestamp(timestamp), ip_src_addr, ip_dst_addr, tcp_src_port, tcp_dst_port " "SELECT gid, sid, rev, unix_timestamp(timestamp), ip_src_addr, ip_dst_addr, tcp_src_port, tcp_dst_port "
@ -351,8 +359,11 @@ __AI_som_train ()
); );
#endif #endif
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.errMsg ( "Warning: Database error while executing the query '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
return; return;
} }
@ -405,6 +416,7 @@ __AI_som_train ()
{ {
if ( !( net = som_network_new ( SOM_NUM_ITEMS, config->outputNeuronsPerSide, config->outputNeuronsPerSide ))) if ( !( net = som_network_new ( SOM_NUM_ITEMS, config->outputNeuronsPerSide, config->outputNeuronsPerSide )))
{ {
pthread_mutex_unlock ( &neural_mutex );
AI_fatal_err ( "AIPreproc: Could not create the neural network", __FILE__, __LINE__ ); AI_fatal_err ( "AIPreproc: Could not create the neural network", __FILE__, __LINE__ );
} }

198
outdb.c
View file

@ -54,14 +54,14 @@ AI_outdb_mutex_initialize ()
} /* ----- end of function AI_outdb_mutex_initialize ----- */ } /* ----- end of function AI_outdb_mutex_initialize ----- */
/** /**
* \brief Thread for storing an alert to the database * \brief Store an alert to the database
* \param arg Alert to be stored * \param alert Alert to be stored
*/ */
void* void
AI_store_alert_to_db_thread ( void *arg ) AI_store_alert_to_db ( AI_snort_alert *alert )
{ {
char query[65535] = { 0 }, char query[32768] = { 0 },
iphdr_id_str[20] = { 0 }, iphdr_id_str[20] = { 0 },
tcphdr_id_str[20] = { 0 }, tcphdr_id_str[20] = { 0 },
srcip[INET_ADDRSTRLEN], srcip[INET_ADDRSTRLEN],
@ -77,12 +77,16 @@ AI_store_alert_to_db_thread ( void *arg )
struct pkt_info *pkt = NULL; struct pkt_info *pkt = NULL;
DB_result res; DB_result res;
DB_row row; DB_row row;
AI_snort_alert *alert = (AI_snort_alert*) arg;
pthread_mutex_lock ( &outdb_mutex ); pthread_mutex_lock ( &outdb_mutex );
if ( !DB_out_init() ) if ( !DB_out_init() )
{
pthread_mutex_unlock ( &outdb_mutex );
AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ ); AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ );
}
pthread_mutex_unlock ( &outdb_mutex );
inet_ntop ( AF_INET, &(alert->ip_src_addr), srcip, INET_ADDRSTRLEN ); inet_ntop ( AF_INET, &(alert->ip_src_addr), srcip, INET_ADDRSTRLEN );
inet_ntop ( AF_INET, &(alert->ip_dst_addr), dstip, INET_ADDRSTRLEN ); inet_ntop ( AF_INET, &(alert->ip_dst_addr), dstip, INET_ADDRSTRLEN );
@ -100,24 +104,27 @@ AI_store_alert_to_db_thread ( void *arg )
srcip, srcip,
dstip ); dstip );
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof ( query ), "SELECT MAX(ip_hdr_id) FROM %s", outdb_config[IPV4_HEADERS_TABLE] ); snprintf ( query, sizeof ( query ), "SELECT MAX(ip_hdr_id) FROM %s", outdb_config[IPV4_HEADERS_TABLE] );
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0); return;
return (void*) 0;
} }
pthread_mutex_unlock ( &outdb_mutex );
if ( !( row = (DB_row) DB_fetch_row ( res ))) if ( !( row = (DB_row) DB_fetch_row ( res )))
{ {
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
latest_ip_hdr_id = strtoul ( row[0], NULL, 10 ); latest_ip_hdr_id = strtoul ( row[0], NULL, 10 );
@ -138,24 +145,27 @@ AI_store_alert_to_db_thread ( void *arg )
ntohs (alert->tcp_window ), ntohs (alert->tcp_window ),
ntohs (alert->tcp_len )); ntohs (alert->tcp_len ));
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof ( query ), "SELECT MAX(tcp_hdr_id) FROM %s", outdb_config[TCP_HEADERS_TABLE] ); snprintf ( query, sizeof ( query ), "SELECT MAX(tcp_hdr_id) FROM %s", outdb_config[TCP_HEADERS_TABLE] );
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0); return;
return (void*) 0;
} }
pthread_mutex_unlock ( &outdb_mutex );
if ( !( row = (DB_row) DB_fetch_row ( res ))) if ( !( row = (DB_row) DB_fetch_row ( res )))
{ {
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
latest_tcp_hdr_id = strtoul ( row[0], NULL, 10 ); latest_tcp_hdr_id = strtoul ( row[0], NULL, 10 );
@ -206,24 +216,27 @@ AI_store_alert_to_db_thread ( void *arg )
tcphdr_id_str ); tcphdr_id_str );
#endif #endif
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof ( query ), "SELECT MAX(alert_id) FROM %s", outdb_config[ALERTS_TABLE] ); snprintf ( query, sizeof ( query ), "SELECT MAX(alert_id) FROM %s", outdb_config[ALERTS_TABLE] );
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0); return;
return (void*) 0;
} }
pthread_mutex_unlock ( &outdb_mutex );
if ( !( row = (DB_row) DB_fetch_row ( res ))) if ( !( row = (DB_row) DB_fetch_row ( res )))
{ {
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
latest_alert_id = strtoul ( row[0], NULL, 10 ); latest_alert_id = strtoul ( row[0], NULL, 10 );
@ -250,47 +263,56 @@ AI_store_alert_to_db_thread ( void *arg )
if ( !( pkt_data = (unsigned char*) alloca ( 2 * ( pkt_size ) + 1 ))) if ( !( pkt_data = (unsigned char*) alloca ( 2 * ( pkt_size ) + 1 )))
AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ ); AI_fatal_err ( "Fatal dynamic memory allocation error", __FILE__, __LINE__ );
DB_out_escape_string ( if ( pkt->pkt )
(char**) &pkt_data, {
(const char*) pkt->pkt->pkt_data, if ( pkt->pkt->pkt_data )
pkt_size ); {
if ( strlen ((const char*) pkt->pkt->pkt_data ) != 0 )
{
DB_out_escape_string (
(char**) &pkt_data,
(const char*) pkt->pkt->pkt_data,
pkt_size );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
#ifdef HAVE_LIBMYSQLCLIENT #ifdef HAVE_LIBMYSQLCLIENT
snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) " snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) "
"VALUES (%lu, %u, from_unixtime('%lu'), '%s')", "VALUES (%lu, %u, from_unixtime('%lu'), '%s')",
outdb_config[PACKET_STREAMS_TABLE], outdb_config[PACKET_STREAMS_TABLE],
latest_alert_id, latest_alert_id,
pkt->pkt->pcap_header->len + pkt->pkt->payload_size, pkt->pkt->pcap_header->len + pkt->pkt->payload_size,
pkt->timestamp, pkt->timestamp,
pkt_data ); pkt_data );
#elif HAVE_LIBPQ #elif HAVE_LIBPQ
snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) " snprintf ( query, sizeof ( query ), "INSERT INTO %s (alert_id, pkt_len, timestamp, content) "
"VALUES (%lu, %u, timestamp with time zone 'epoch' + %lu * interval '1 second', '%s')", "VALUES (%lu, %u, timestamp with time zone 'epoch' + %lu * interval '1 second', '%s')",
outdb_config[PACKET_STREAMS_TABLE], outdb_config[PACKET_STREAMS_TABLE],
latest_alert_id, latest_alert_id,
pkt->pkt->pcap_header->len + pkt->pkt->payload_size, pkt->pkt->pcap_header->len + pkt->pkt->payload_size,
pkt->timestamp, pkt->timestamp,
pkt_data ); pkt_data );
#endif #endif
DB_free_result ((DB_result) DB_out_query ( query )); pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
}
}
}
} }
} }
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0); } /* ----- end of function AI_store_alert_to_db ----- */
return (void*) 0;
} /* ----- end of function AI_store_alert_to_db_thread ----- */
/** /**
* \brief Store an alert cluster to database * \brief Store an alert cluster to database
* \param arg Struct pointer containing the couple of alerts to be clustered together * \param alerts_couple Struct pointer containing the couple of alerts to be clustered together
*/ */
void* void
AI_store_cluster_to_db_thread ( void *arg ) AI_store_cluster_to_db ( AI_alerts_couple *alerts_couple )
{ {
int i; int i;
unsigned long cluster1 = 0, unsigned long cluster1 = 0,
@ -303,35 +325,35 @@ AI_store_cluster_to_db_thread ( void *arg )
srcport[10] = { 0 }, srcport[10] = { 0 },
dstport[10] = { 0 }; dstport[10] = { 0 };
AI_alerts_couple *alerts_couple = (AI_alerts_couple*) arg;
AI_couples_cache *found = NULL; AI_couples_cache *found = NULL;
DB_result res; DB_result res;
DB_row row; DB_row row;
BOOL new_cluster = false; BOOL new_cluster = false;
pthread_mutex_lock ( &outdb_mutex );
/* Check if the couple of alerts is already in our cache, so it already /* Check if the couple of alerts is already in our cache, so it already
* belongs to the same cluster. If so, just return */ * belongs to the same cluster. If so, just return */
HASH_FIND ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found ); HASH_FIND ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found );
if ( found ) if ( found )
{ {
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
/* Initialize the database (it just does nothing if it is already initialized) */ /* Initialize the database (it just does nothing if it is already initialized) */
pthread_mutex_lock ( &outdb_mutex );
if ( !DB_out_init() ) if ( !DB_out_init() )
{
pthread_mutex_unlock ( &outdb_mutex );
AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ ); AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ );
}
pthread_mutex_unlock ( &outdb_mutex );
/* If one of the two alerts has no alert_id, simply return */ /* If one of the two alerts has no alert_id, simply return */
if ( !alerts_couple->alert1->alert_id || !alerts_couple->alert2->alert_id ) if ( !alerts_couple->alert1->alert_id || !alerts_couple->alert2->alert_id )
{ {
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
/* Check if there already exist a cluster containing one of them */ /* Check if there already exist a cluster containing one of them */
@ -340,14 +362,16 @@ AI_store_cluster_to_db_thread ( void *arg )
"SELECT cluster_id FROM %s WHERE alert_id=%lu OR alert_id=%lu", "SELECT cluster_id FROM %s WHERE alert_id=%lu OR alert_id=%lu",
outdb_config[ALERTS_TABLE], alerts_couple->alert1->alert_id, alerts_couple->alert2->alert_id ); outdb_config[ALERTS_TABLE], alerts_couple->alert1->alert_id, alerts_couple->alert2->alert_id );
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0); return;
return (void*) 0;
} }
pthread_mutex_unlock ( &outdb_mutex );
new_cluster = true; new_cluster = true;
for ( i=0; (row = (DB_row) DB_fetch_row ( res )); i++ ) for ( i=0; (row = (DB_row) DB_fetch_row ( res )); i++ )
@ -379,9 +403,7 @@ AI_store_cluster_to_db_thread ( void *arg )
found->alerts_couple = alerts_couple; found->alerts_couple = alerts_couple;
found->cluster_id = cluster1; found->cluster_id = cluster1;
HASH_ADD ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found ); HASH_ADD ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found );
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
if ( new_cluster ) if ( new_cluster )
@ -403,25 +425,28 @@ AI_store_cluster_to_db_thread ( void *arg )
((alerts_couple->alert1->h_node[dst_port]) ? alerts_couple->alert1->h_node[dst_port]->label : dstport) ((alerts_couple->alert1->h_node[dst_port]) ? alerts_couple->alert1->h_node[dst_port]->label : dstport)
); );
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof ( query ), snprintf ( query, sizeof ( query ),
"SELECT MAX(cluster_id) FROM %s", outdb_config[CLUSTERED_ALERTS_TABLE] ); "SELECT MAX(cluster_id) FROM %s", outdb_config[CLUSTERED_ALERTS_TABLE] );
pthread_mutex_lock ( &outdb_mutex );
if ( !( res = (DB_result) DB_out_query ( query ))) if ( !( res = (DB_result) DB_out_query ( query )))
{ {
_dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query ); _dpd.logMsg ( "AIPreproc: Warning: error in executing query: '%s'\n", query );
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0); return;
return (void*) 0;
} }
pthread_mutex_unlock ( &outdb_mutex );
if ( !( row = (DB_row) DB_fetch_row ( res ))) if ( !( row = (DB_row) DB_fetch_row ( res )))
{ {
pthread_mutex_unlock ( &outdb_mutex ); return;
pthread_exit ((void*) 0);
return (void*) 0;
} }
latest_cluster_id = strtoul ( row[0], NULL, 10 ); latest_cluster_id = strtoul ( row[0], NULL, 10 );
@ -434,7 +459,9 @@ AI_store_cluster_to_db_thread ( void *arg )
outdb_config[ALERTS_TABLE], latest_cluster_id, outdb_config[ALERTS_TABLE], latest_cluster_id,
alerts_couple->alert1->alert_id, alerts_couple->alert2->alert_id ); alerts_couple->alert1->alert_id, alerts_couple->alert2->alert_id );
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
} else { } else {
/* Update the alert marked as 'not clustered' */ /* Update the alert marked as 'not clustered' */
if ( !cluster1 ) if ( !cluster1 )
@ -444,14 +471,18 @@ AI_store_cluster_to_db_thread ( void *arg )
"UPDATE %s SET cluster_id=%lu WHERE alert_id=%lu", "UPDATE %s SET cluster_id=%lu WHERE alert_id=%lu",
outdb_config[ALERTS_TABLE], cluster2, alerts_couple->alert1->alert_id ); outdb_config[ALERTS_TABLE], cluster2, alerts_couple->alert1->alert_id );
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
} else { } else {
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof ( query ), snprintf ( query, sizeof ( query ),
"UPDATE %s SET cluster_id=%lu WHERE alert_id=%lu", "UPDATE %s SET cluster_id=%lu WHERE alert_id=%lu",
outdb_config[ALERTS_TABLE], cluster1, alerts_couple->alert2->alert_id ); outdb_config[ALERTS_TABLE], cluster1, alerts_couple->alert2->alert_id );
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query )); DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex );
} }
} }
@ -462,11 +493,7 @@ AI_store_cluster_to_db_thread ( void *arg )
found->alerts_couple = alerts_couple; found->alerts_couple = alerts_couple;
found->cluster_id = cluster1; found->cluster_id = cluster1;
HASH_ADD ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found ); HASH_ADD ( hh, couples_cache, alerts_couple, sizeof ( AI_alerts_couple ), found );
} /* ----- end of function AI_store_cluster_to_db ----- */
pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0);
return (void*) 0;
} /* ----- end of function AI_store_cluster_to_db_thread ----- */
/** /**
@ -474,17 +501,21 @@ AI_store_cluster_to_db_thread ( void *arg )
* \param arg Structure containing the two alerts to be saved and their correlation * \param arg Structure containing the two alerts to be saved and their correlation
*/ */
void* void
AI_store_correlation_to_db_thread ( void *arg ) AI_store_correlation_to_db ( AI_alert_correlation *corr )
{ {
char query[1024] = { 0 }; char query[1024] = { 0 };
AI_alert_correlation *corr = (AI_alert_correlation*) arg;
pthread_mutex_lock ( &outdb_mutex );
/* Initialize the database (it just does nothing if it is already initialized) */ /* Initialize the database (it just does nothing if it is already initialized) */
pthread_mutex_lock ( &outdb_mutex );
if ( !DB_out_init() ) if ( !DB_out_init() )
{
pthread_mutex_unlock ( &outdb_mutex );
AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ ); AI_fatal_err ( "Unable to connect to the specified output database", __FILE__, __LINE__ );
}
pthread_mutex_unlock ( &outdb_mutex );
memset ( query, 0, sizeof ( query )); memset ( query, 0, sizeof ( query ));
snprintf ( query, sizeof ( query ), snprintf ( query, sizeof ( query ),
@ -494,12 +525,11 @@ AI_store_correlation_to_db_thread ( void *arg )
corr->key.a->alert_id, corr->key.a->alert_id,
corr->key.b->alert_id, corr->key.b->alert_id,
corr->correlation ); corr->correlation );
DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_lock ( &outdb_mutex );
DB_free_result ((DB_result) DB_out_query ( query ));
pthread_mutex_unlock ( &outdb_mutex ); pthread_mutex_unlock ( &outdb_mutex );
pthread_exit ((void*) 0); } /* ----- end of function AI_store_correlation_to_db ----- */
return 0;
} /* ----- end of function AI_store_correlation_to_db_thread ----- */
#endif #endif

View file

@ -200,6 +200,13 @@ unsigned long
postgresql_do_escape_string ( char **to, const char *from, unsigned long length ) postgresql_do_escape_string ( char **to, const char *from, unsigned long length )
{ {
size_t out_len = 0; size_t out_len = 0;
if ( !from )
return 0;
if ( strlen ( from ) == 0 )
return 0;
*to = (char*) PQescapeByteaConn ( db, (const unsigned char* ) from, (size_t) length, &out_len ); *to = (char*) PQescapeByteaConn ( db, (const unsigned char* ) from, (size_t) length, &out_len );
return (unsigned long) out_len; return (unsigned long) out_len;
} }
@ -234,6 +241,13 @@ unsigned long
postgresql_do_out_escape_string ( char **to, const char *from, unsigned long length ) postgresql_do_out_escape_string ( char **to, const char *from, unsigned long length )
{ {
size_t out_len = 0; size_t out_len = 0;
if ( !from )
return 0;
if ( strlen ( from ) == 0 )
return 0;
*to = (char*) PQescapeByteaConn ( outdb, (const unsigned char* ) from, (size_t) length, &out_len ); *to = (char*) PQescapeByteaConn ( outdb, (const unsigned char* ) from, (size_t) length, &out_len );
return (unsigned long) out_len; return (unsigned long) out_len;
} }

View file

@ -157,6 +157,7 @@ static void AI_init(char *args)
AI_fatal_err ( "Failed to create the neural network thread", __FILE__, __LINE__ ); AI_fatal_err ( "Failed to create the neural network thread", __FILE__, __LINE__ );
} }
} }
/* Register the preprocessor function, Transport layer, ID 10000 */ /* Register the preprocessor function, Transport layer, ID 10000 */
_dpd.addPreproc(AI_process, PRIORITY_TRANSPORT, 10000, PROTO_BIT__TCP | PROTO_BIT__UDP); _dpd.addPreproc(AI_process, PRIORITY_TRANSPORT, 10000, PROTO_BIT__TCP | PROTO_BIT__UDP);
DEBUG_WRAP(_dpd.debugMsg(DEBUG_PLUGIN, "Preprocessor: AI is initialized\n");); DEBUG_WRAP(_dpd.debugMsg(DEBUG_PLUGIN, "Preprocessor: AI is initialized\n"););

View file

@ -552,9 +552,9 @@ double AI_neural_correlation_weight ();
double AI_bayesian_correlation_weight (); double AI_bayesian_correlation_weight ();
void AI_outdb_mutex_initialize (); void AI_outdb_mutex_initialize ();
void* AI_store_alert_to_db_thread ( void* ); void AI_store_alert_to_db ( AI_snort_alert* );
void* AI_store_cluster_to_db_thread ( void* ); void AI_store_cluster_to_db ( AI_alerts_couple* );
void* AI_store_correlation_to_db_thread ( void* ); void AI_store_correlation_to_db ( AI_alert_correlation* );
void* AI_neural_clustering_thread ( void* ); void* AI_neural_clustering_thread ( void* );
AI_alerts_per_neuron* AI_get_alerts_per_neuron (); AI_alerts_per_neuron* AI_get_alerts_per_neuron ();

View file

@ -208,6 +208,10 @@ AI_pkt_enqueue ( SFSnortPacket* pkt )
tmp = NULL; tmp = NULL;
for ( ; found->next; found = found->next ) { for ( ; found->next; found = found->next ) {
/* Stupid memory bug fixed in a stupid and unelegant way */
if ( (int) found->next->pkt < 0x100 )
break;
/* If the sequence number of the next packet in the stream /* If the sequence number of the next packet in the stream
* is bigger than the sequence number of the current packet, * is bigger than the sequence number of the current packet,
* place the current packet before that */ * place the current packet before that */