Handling Postgres Database Events from a C# Application

A web services application written in C# needs to monitor changes to the associated database and return an event to remote applications through Server-Sent Events. The database if based on Postgresql but the same concepts apply to other SQL databases.

There are many steps involved and all should be carefully followed.

  1. Using pgAdmin or psql command line, create a trigger function which will send back notifications to the connected application in JSON format. In pgAdmin, the trigger function goes under Schemas → Public → Trigger Functions.
    Here is a sample script to create a trigger function:
CREATE OR REPLACE FUNCTION public.notifyonmytablechange()
    RETURNS trigger
    LANGUAGE 'plpgsql'
AS $BODY$
 
DECLARE
    data JSON;
    notification JSON;
BEGIN
-- TG_OP is a predefined parameter that contains INSERT, UPDATE or DELETE
-- When a row is added or updated, the data is held into the NEW parameter
-- When a row is deleted, the data is held into the OLD parameter
IF (TG_OP = 'DELETE') THEN
  notification = json_build_object(
        'table', TG_TABLE_NAME,
        'action', TG_OP,
        'data', row_to_json(OLD));  
ELSE
  notification = json_build_object(
        'table', TG_TABLE_NAME,
        'action', TG_OP,
        'data', row_to_json(NEW));  
END IF;

-- Now perform the actual notification      
PERFORM pg_notify('datachange', notification::TEXT);
RETURN NEW;
END
$BODY$;

-- make sure the function owner is valid
ALTER FUNCTION public.notifyonmytablechange()
    OWNER TO admin_user;
  1. Under Schemas → Tables and the table that you want to monitor, define the conditions that will call the trigger function that we just added:
-- Call the notifyonmytablechange function whenever the Value column changes
CREATE OR REPLACE TRIGGER "OnDataChange"
    AFTER UPDATE OF "Value"    -- "Value" is the column that we are monitoring
    ON public."Devices"    -- "Devices" is the table that we are monitoring
    FOR EACH ROW
    EXECUTE FUNCTION public.notifyonmytablechange();
  1. Now that the Postgres configuration is done, we can start with the C# code to handle the database events. Create a connection to the database and register a notification handler in the application initialization section:
// Create another DataSource to receive notifications from the Database
builder.Services.AddSingleton<NpgsqlDataSource>((sp) =>
{
    var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
    return dataSourceBuilder.Build();
});

// Register the Database Notification Handler
builder.Services.AddSingleton<IPostgresNotificationHandler>(ExpoSampleContext.notificationHandler);

// Add the Background Service processing the Notifications
builder.Services.AddHostedService<PostgresNotificationService>();
  1. Add a Server-Sent-Events (SSE) handler to forward the events back to the client application:
// Custom endpoint to monitor the completion states of items using Postgres NOTIFY / LISTEN
app.MapGet("/done", GetItemCompletionState);
...
/// <summary>
/// Retrieves the completion state of an Item using SSE.
/// </summary>
/// <param name="Accept">The Accept header value.</param>
/// <param name="request">The HTTP request.</param>
/// <param name="response">The HTTP response.</param>
/// <returns>Streams or returns the Item state.</returns>
static async Task<IResult> GetItemCompletionState([FromHeader] string? Accept,
                    HttpContext context,
                    HttpResponse response
                    )
{
    if (Accept is not null && Accept == "text/event-stream")
    {
        //
        // case of SSE streaming
        //
        PostgresNotificationListener listener = new PostgresNotificationListener();
        ExpoSampleContext.notificationHandler.AddListener(listener);

        response.ContentType = "text/event-stream";

        // This call has the effect of sending the response without chunking.
        // This is to be compatible with the Firebase client libraries
        response.Headers.TransferEncoding = "";

        // loop until the request is cancelled by the client closing the connection or by a timeout.
        while (context.RequestAborted.IsCancellationRequested == false)
        {
            SSEResponse resp = new SSEResponse { path = "/", data = (listener.eventPayload) };
            // return the initial state
            var json = JsonSerializer.Serialize(resp);
            await response.WriteAsync($"event: put\ndata: {json}\n\n");
            await response.Body.FlushAsync();

            // monitor state changes
            listener.ewh.WaitOne();
        }
        ExpoSampleContext.notificationHandler.RemoveListener(listener);
        return TypedResults.Ok();
    }
    else
    {
        return TypedResults.InternalServerError();
    }
}
  1. Implement the PostgresNotificationListener, PostgresNotificationHandler and PostgresNotificationService classes as follows:
    /// <summary>
    /// Handles a Notification received from a Postgres NOTIFY / LISTEN.
    /// </summary>
    public class PostgresNotificationListener
    {
        public EventWaitHandle ewh = new EventWaitHandle(false, EventResetMode.AutoReset);
        public string eventPayload = "";

        public ValueTask HandleNotificationAsync(string notification, CancellationToken cancellationToken)
        {
            eventPayload = notification;
            ewh.Set();
            return ValueTask.CompletedTask;
        }
    }

    /// <summary>
    /// Logs all Notifications received from a Postgres Channel.
    /// </summary>
    public class PostgresNotificationHandler
    {
        private List<PostgresNotificationListener> listeners = new List<PostgresNotificationListener>();

        public void AddListener(PostgresNotificationListener listener) { listeners.Add(listener); }

        public void RemoveListener(PostgresNotificationListener listener) { listeners.Remove(listener); }

        public ValueTask HandleNotificationAsync(string notification, CancellationToken cancellationToken)
        {
            Trace.WriteLine("PostgresNotification, Payload = " + notification);

            // send the notification to all listeners
            foreach (var listener in listeners)
            {
                listener.HandleNotificationAsync(notification, cancellationToken);
            }

            return ValueTask.CompletedTask;
        }

    }

    /// <summary>
    /// This Service waits for Notifications received on a given Postgres Channel name.
    /// </summary>
    public class PostgresNotificationService : BackgroundService
    {
        private readonly PostgresNotificationHandler _postgresNotificationHandler;
        private readonly NpgsqlDataSource _npgsqlDataSource;

        public PostgresNotificationService(NpgsqlDataSource npgsqlDataSource, PostgresNotificationHandler postgresNotificationHandler)
        {
            _npgsqlDataSource = npgsqlDataSource;
            _postgresNotificationHandler = postgresNotificationHandler;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            // We are running both loops until either of them is stopped or runs dry ...
            await Task
                .WhenAny(SetupPostgresAsync(stoppingToken))
                .ConfigureAwait(false);

            // Initializes the Postgres Listener by issuing a LISTEN Command.
            async Task SetupPostgresAsync(CancellationToken cancellationToken)
            {
                // Open a new Connection, which can be used to issue the LISTEN Command 
                // to the Postgres Database.
                using var connection = await _npgsqlDataSource
                    .OpenConnectionAsync(cancellationToken)
                    .ConfigureAwait(false);

            // If we receive a message from Postgres, we notify all listeners.
            // In critical application where we don't want to skip any events,
            // we would need to store the notification in a Channel (System.Threading.Channels)
            // and process these asynchronously
            connection.Notification += async (sender, x) =>
                {
                    await _postgresNotificationHandler.HandleNotificationAsync(x.Payload, cancellationToken).ConfigureAwait(false);
                };

                // Register to the datachange notifications
                using (var command = new NpgsqlCommand($"LISTEN datachange", connection))
                {
                    await command
                        .ExecuteNonQueryAsync(cancellationToken)
                        .ConfigureAwait(false);
                }

                // Put the connection into the Wait State until the Cancellation is requested
                while (!cancellationToken.IsCancellationRequested)
                {
                    await connection
                        .WaitAsync(cancellationToken)
                        .ConfigureAwait(false);
                }
            }
        }
    }
  1. And that’s it! The full web services sample application can be found on Github. We can use curl on the command line to test the application by running:
curl -N -H "Accept:text/event-stream" http://localhost:8080/done