A web services application written in C# needs to monitor changes to the associated database and return an event to remote applications through Server-Sent Events. The database if based on Postgresql but the same concepts apply to other SQL databases.
There are many steps involved and all should be carefully followed.
- Using pgAdmin or psql command line, create a trigger function which will send back notifications to the connected application in JSON format. In pgAdmin, the trigger function goes under Schemas → Public → Trigger Functions.
Here is a sample script to create a trigger function:
CREATE OR REPLACE FUNCTION public.notifyonmytablechange()
RETURNS trigger
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
data JSON;
notification JSON;
BEGIN
-- TG_OP is a predefined parameter that contains INSERT, UPDATE or DELETE
-- When a row is added or updated, the data is held into the NEW parameter
-- When a row is deleted, the data is held into the OLD parameter
IF (TG_OP = 'DELETE') THEN
notification = json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'data', row_to_json(OLD));
ELSE
notification = json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'data', row_to_json(NEW));
END IF;
-- Now perform the actual notification
PERFORM pg_notify('datachange', notification::TEXT);
RETURN NEW;
END
$BODY$;
-- make sure the function owner is valid
ALTER FUNCTION public.notifyonmytablechange()
OWNER TO admin_user;
- Under Schemas → Tables and the table that you want to monitor, define the conditions that will call the trigger function that we just added:
-- Call the notifyonmytablechange function whenever the Value column changes
CREATE OR REPLACE TRIGGER "OnDataChange"
AFTER UPDATE OF "Value" -- "Value" is the column that we are monitoring
ON public."Devices" -- "Devices" is the table that we are monitoring
FOR EACH ROW
EXECUTE FUNCTION public.notifyonmytablechange();
- Now that the Postgres configuration is done, we can start with the C# code to handle the database events. Create a connection to the database and register a notification handler in the application initialization section:
// Create another DataSource to receive notifications from the Database
builder.Services.AddSingleton<NpgsqlDataSource>((sp) =>
{
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
return dataSourceBuilder.Build();
});
// Register the Database Notification Handler
builder.Services.AddSingleton<IPostgresNotificationHandler>(ExpoSampleContext.notificationHandler);
// Add the Background Service processing the Notifications
builder.Services.AddHostedService<PostgresNotificationService>();
- Add a Server-Sent-Events (SSE) handler to forward the events back to the client application:
// Custom endpoint to monitor the completion states of items using Postgres NOTIFY / LISTEN
app.MapGet("/done", GetItemCompletionState);
...
/// <summary>
/// Retrieves the completion state of an Item using SSE.
/// </summary>
/// <param name="Accept">The Accept header value.</param>
/// <param name="request">The HTTP request.</param>
/// <param name="response">The HTTP response.</param>
/// <returns>Streams or returns the Item state.</returns>
static async Task<IResult> GetItemCompletionState([FromHeader] string? Accept,
HttpContext context,
HttpResponse response
)
{
if (Accept is not null && Accept == "text/event-stream")
{
//
// case of SSE streaming
//
PostgresNotificationListener listener = new PostgresNotificationListener();
ExpoSampleContext.notificationHandler.AddListener(listener);
response.ContentType = "text/event-stream";
// This call has the effect of sending the response without chunking.
// This is to be compatible with the Firebase client libraries
response.Headers.TransferEncoding = "";
// loop until the request is cancelled by the client closing the connection or by a timeout.
while (context.RequestAborted.IsCancellationRequested == false)
{
SSEResponse resp = new SSEResponse { path = "/", data = (listener.eventPayload) };
// return the initial state
var json = JsonSerializer.Serialize(resp);
await response.WriteAsync($"event: put\ndata: {json}\n\n");
await response.Body.FlushAsync();
// monitor state changes
listener.ewh.WaitOne();
}
ExpoSampleContext.notificationHandler.RemoveListener(listener);
return TypedResults.Ok();
}
else
{
return TypedResults.InternalServerError();
}
}
- Implement the PostgresNotificationListener, PostgresNotificationHandler and PostgresNotificationService classes as follows:
/// <summary>
/// Handles a Notification received from a Postgres NOTIFY / LISTEN.
/// </summary>
public class PostgresNotificationListener
{
public EventWaitHandle ewh = new EventWaitHandle(false, EventResetMode.AutoReset);
public string eventPayload = "";
public ValueTask HandleNotificationAsync(string notification, CancellationToken cancellationToken)
{
eventPayload = notification;
ewh.Set();
return ValueTask.CompletedTask;
}
}
/// <summary>
/// Logs all Notifications received from a Postgres Channel.
/// </summary>
public class PostgresNotificationHandler
{
private List<PostgresNotificationListener> listeners = new List<PostgresNotificationListener>();
public void AddListener(PostgresNotificationListener listener) { listeners.Add(listener); }
public void RemoveListener(PostgresNotificationListener listener) { listeners.Remove(listener); }
public ValueTask HandleNotificationAsync(string notification, CancellationToken cancellationToken)
{
Trace.WriteLine("PostgresNotification, Payload = " + notification);
// send the notification to all listeners
foreach (var listener in listeners)
{
listener.HandleNotificationAsync(notification, cancellationToken);
}
return ValueTask.CompletedTask;
}
}
/// <summary>
/// This Service waits for Notifications received on a given Postgres Channel name.
/// </summary>
public class PostgresNotificationService : BackgroundService
{
private readonly PostgresNotificationHandler _postgresNotificationHandler;
private readonly NpgsqlDataSource _npgsqlDataSource;
public PostgresNotificationService(NpgsqlDataSource npgsqlDataSource, PostgresNotificationHandler postgresNotificationHandler)
{
_npgsqlDataSource = npgsqlDataSource;
_postgresNotificationHandler = postgresNotificationHandler;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// We are running both loops until either of them is stopped or runs dry ...
await Task
.WhenAny(SetupPostgresAsync(stoppingToken))
.ConfigureAwait(false);
// Initializes the Postgres Listener by issuing a LISTEN Command.
async Task SetupPostgresAsync(CancellationToken cancellationToken)
{
// Open a new Connection, which can be used to issue the LISTEN Command
// to the Postgres Database.
using var connection = await _npgsqlDataSource
.OpenConnectionAsync(cancellationToken)
.ConfigureAwait(false);
// If we receive a message from Postgres, we notify all listeners.
// In critical application where we don't want to skip any events,
// we would need to store the notification in a Channel (System.Threading.Channels)
// and process these asynchronously
connection.Notification += async (sender, x) =>
{
await _postgresNotificationHandler.HandleNotificationAsync(x.Payload, cancellationToken).ConfigureAwait(false);
};
// Register to the datachange notifications
using (var command = new NpgsqlCommand($"LISTEN datachange", connection))
{
await command
.ExecuteNonQueryAsync(cancellationToken)
.ConfigureAwait(false);
}
// Put the connection into the Wait State until the Cancellation is requested
while (!cancellationToken.IsCancellationRequested)
{
await connection
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
}
}
}
- And that’s it! The full web services sample application can be found on Github. We can use curl on the command line to test the application by running:
curl -N -H "Accept:text/event-stream" http://localhost:8080/done