@@ -144,6 +144,9 @@ struct winevtlog_channel *winevtlog_subscribe(const char *channel, struct winevt
144144 return NULL ;
145145 }
146146 ch -> signal_event = signal_event ;
147+ ch -> cancelled_by_us = FALSE;
148+ ch -> reconnect_needed = FALSE;
149+ ch -> last_error = 0 ;
147150
148151 if (stored_bookmark ) {
149152 ch -> bookmark = stored_bookmark ;
@@ -184,9 +187,16 @@ struct winevtlog_channel *winevtlog_subscribe(const char *channel, struct winevt
184187
185188BOOL cancel_subscription (struct winevtlog_channel * ch )
186189{
190+ ch -> cancelled_by_us = TRUE;
187191 return EvtCancel (ch -> subscription );
188192}
189193
194+ void winevtlog_request_cancel (struct winevtlog_channel * ch )
195+ {
196+ ch -> cancelled_by_us = TRUE;
197+ EvtCancel (ch -> subscription );
198+ }
199+
190200static void close_handles (struct winevtlog_channel * ch )
191201{
192202 int i ;
@@ -604,10 +614,21 @@ static int winevtlog_next(struct winevtlog_channel *ch, int hit_threshold)
604614
605615 if (!has_next ) {
606616 status = GetLastError ();
607- if (ERROR_CANCELLED == status ) {
617+ if (status == ERROR_CANCELLED ) {
618+ if (ch -> cancelled_by_us ) {
619+ /* Conmsume this flag and return early */
620+ ch -> cancelled_by_us = FALSE;
621+ return FLB_FALSE ;
622+ }
623+ ch -> reconnect_needed = TRUE;
624+ ch -> last_error = status ;
625+ flb_debug ("[in_winevtlog] subscription cancelled unexpectedly (err=%lu), will reconnect" , status );
608626 return FLB_FALSE ;
609627 }
610- if (ERROR_NO_MORE_ITEMS != status ) {
628+ if (status != ERROR_NO_MORE_ITEMS ) {
629+ ch -> reconnect_needed = TRUE;
630+ ch -> last_error = status ;
631+ flb_warn ("[in_winevtlog] EvtNext failed (err=%lu), will reconnect" , status );
611632 return FLB_FALSE ;
612633 }
613634
@@ -627,6 +648,126 @@ static int winevtlog_next(struct winevtlog_channel *ch, int hit_threshold)
627648 return FLB_FALSE ;
628649}
629650
651+ int winevtlog_reconnect (struct winevtlog_channel * ch , struct winevtlog_config * ctx )
652+ {
653+ HANDLE new_signal = NULL ;
654+ EVT_HANDLE new_remote = NULL ;
655+ EVT_HANDLE new_sub = NULL ;
656+ DWORD flags = 0 ;
657+ DWORD err = 0 ;
658+ PWSTR wide_channel = NULL ;
659+ PWSTR wide_query = NULL ;
660+ DWORD len ;
661+
662+ len = MultiByteToWideChar (CP_UTF8 , 0 , ch -> name , -1 , NULL , 0 );
663+ if (len == 0 ) {
664+ return -1 ;
665+ }
666+ wide_channel = flb_malloc (sizeof (WCHAR ) * len );
667+ if (!wide_channel ) {
668+ return -1 ;
669+ }
670+ MultiByteToWideChar (CP_UTF8 , 0 , ch -> name , -1 , wide_channel , len );
671+
672+ if (ch -> query ) {
673+ len = MultiByteToWideChar (CP_UTF8 , 0 , ch -> query , -1 , NULL , 0 );
674+ if (len == 0 ) {
675+ flb_free (wide_channel );
676+ return -1 ;
677+ }
678+ wide_query = flb_malloc (sizeof (WCHAR ) * len );
679+ if (!wide_query ) {
680+ flb_free (wide_channel );
681+ return -1 ;
682+ }
683+ MultiByteToWideChar (CP_UTF8 , 0 , ch -> query , -1 , wide_query , len );
684+ }
685+
686+ new_signal = CreateEvent (NULL , TRUE, TRUE, NULL );
687+ if (!new_signal ) {
688+ flb_free (wide_channel );
689+ if (wide_query ) {
690+ flb_free (wide_query );
691+ }
692+ return -1 ;
693+ }
694+
695+ if (ch -> session ) {
696+ new_remote = create_remote_handle (ch -> session , & err );
697+ if (err != ERROR_SUCCESS || !new_remote ) {
698+ flb_plg_error (ctx -> ins , "reconnect: cannot create remote handle '%s' in %ls (err=%lu)" ,
699+ ch -> name , ch -> session -> server , err );
700+ CloseHandle (new_signal );
701+ flb_free (wide_channel );
702+ if (wide_query ) {
703+ flb_free (wide_query );
704+ }
705+ return -1 ;
706+ }
707+ }
708+
709+ if (ch -> bookmark ) {
710+ flags = EvtSubscribeStartAfterBookmark ;
711+ }
712+ else if (ctx -> read_existing_events ) {
713+ flags = EvtSubscribeStartAtOldestRecord ;
714+ }
715+ else {
716+ flags = EvtSubscribeToFutureEvents ;
717+ }
718+
719+ new_sub = EvtSubscribe (new_remote , new_signal , wide_channel , wide_query ,
720+ ch -> bookmark , NULL , NULL , flags );
721+ if (!new_sub ) {
722+ DWORD sub_err = GetLastError ();
723+ if (sub_err == ERROR_EVT_QUERY_RESULT_STALE ) {
724+ flb_plg_warn (ctx -> ins , "reconnect: bookmark stale on '%s' (err=%lu), falling back to latest" ,
725+ ch -> name , sub_err );
726+ flags = ctx -> read_existing_events ? EvtSubscribeStartAtOldestRecord
727+ : EvtSubscribeToFutureEvents ;
728+ new_sub = EvtSubscribe (new_remote , new_signal , wide_channel , wide_query ,
729+ NULL , NULL , NULL , flags );
730+ }
731+ }
732+
733+ if (!new_sub ) {
734+ DWORD sub_err = GetLastError ();
735+ flb_plg_error (ctx -> ins , "reconnect: EvtSubscribe failed on '%s' (err=%lu)" , ch -> name , sub_err );
736+ if (new_remote ) EvtClose (new_remote );
737+ CloseHandle (new_signal );
738+ flb_free (wide_channel );
739+ if (wide_query ) {
740+ flb_free (wide_query );
741+ }
742+ return -1 ;
743+ }
744+
745+ if (ch -> subscription ) {
746+ EvtClose (ch -> subscription );
747+ }
748+ if (ch -> remote ) {
749+ EvtClose (ch -> remote );
750+ }
751+ if (ch -> signal_event ) {
752+ CloseHandle (ch -> signal_event );
753+ }
754+
755+ ch -> subscription = new_sub ;
756+ ch -> remote = new_remote ;
757+ ch -> signal_event = new_signal ;
758+ ch -> reconnect_needed = FALSE;
759+ ch -> last_error = 0 ;
760+ ch -> count = 0 ;
761+
762+ flb_plg_debug (ctx -> ins , "reconnected subscription for '%s'" , ch -> name );
763+
764+ flb_free (wide_channel );
765+ if (wide_query ) {
766+ flb_free (wide_query );
767+ }
768+ return 0 ;
769+ }
770+
630771/*
631772 * Read from an open Windows Event Log channel.
632773 */
@@ -645,6 +786,7 @@ int winevtlog_read(struct winevtlog_channel *ch, struct winevtlog_config *ctx,
645786 PEVT_VARIANT string_inserts = NULL ;
646787 UINT count_inserts = 0 ;
647788 DWORD i = 0 ;
789+ int rc = 0 ;
648790
649791 while (winevtlog_next (ch , hit_threshold )) {
650792 for (i = 0 ; i < ch -> count ; i ++ ) {
@@ -699,6 +841,13 @@ int winevtlog_read(struct winevtlog_channel *ch, struct winevtlog_config *ctx,
699841
700842 * read = read_size ;
701843
844+ if (ch -> reconnect_needed ) {
845+ rc = winevtlog_reconnect (ch , ctx );
846+ if (rc != 0 ) {
847+ flb_plg_error (ctx -> ins , "reconnect attempt failed for '%s' (last_error=%lu)" ,
848+ ch -> name , ch -> last_error );
849+ }
850+ }
702851 return 0 ;
703852}
704853
0 commit comments