Mastering Azure Serverless Computing
上QQ阅读APP看书,第一时间看更新

Creating a custom trigger

Before starting to implement your custom trigger, is important to understand what the classes involved in the trigger pipeline are and what their responsibility is in that pipeline:

  • TriggerConfigProvider: This implements the IExtensionConfigProvider interface and it has the responsibility of configuring the entire trigger process in terms of:
    • The attribute that defines the trigger
    • The payload associated with the trigger (for example, the HTTP request for HttpTrigger)
    • The actual binding provider class to use to create the actual binding object
  • TriggerAttribute: This is used to decorate a method parameter to identify the function trigger. It inherits from Attribute and is decorated by BindingAttribute. It has the responsibility of containing the trigger data (for example, the schedule expression for TimerTrigger).
  • TriggerBindingProvider: This implements the ITriggerBindingProvider interface and has the responsibility of creating the actual binding object. It is the factory class. In this class, you can read the configuration data from the settings and pass them to the binding class.
  • TriggerBinding: This implements the ITriggerBinding interface and is responsible for creating the listener instance.
  • TriggerListener: This implements the IListener interface and has the responsibility of reacting to events and executing the function. There is a listener for each function you define in your code.

The following diagram shows the interactions between the classes mentioned previously in the different phases of the binding process:

To better clarify the mechanism behind triggers, we'll try to implement a trigger that executes a function when the temperature measured in a city changes:

  1. Let's start by implementing the trigger attribute:
[Binding]
[AttributeUsage(AttributeTargets.Parameter)]
public class WeatherTriggerAttribute : Attribute
{
public WeatherTriggerAttribute(string cityName, double temperatureThreshold)
{
CityName = cityName;
TemperatureThreshold = temperatureThreshold;
}

public string ApiKey { get; set; }
public string CityName { get; internal set; }
public double TemperatureThreshold { get; internal set; }
}

We would like to use OpenWeatherMap services to retrieve the weather information (including the temperature) for a specific city, so we need the city name (for example, "Rome, IT" for Rome in Italy), the API key to use the OpenWeatherMap API, and the temperature threshold (that is, the difference, in degree Celsius, that fires the trigger).

OpenWeatherMap provides you with a set of REST APIs to retrieve weather data. It has a free plan that allows you to make 60 calls per minute. You can find more info about it at  https://openweathermap.org.

When the function is triggered, the trigger sends us the weather temperature using the following payload:

public class WeatherPayload
{
public string CityName { get; set; }

public double CurrentTemperature { get; set; }

public double LastTemperature { get; set; }

public DateTimeOffset Timestamp { get; internal set; }
}

So, an Azure Function that uses our trigger is as follows:

[FunctionName(nameof(RomeWeatherCheck))]
public static void RomeWeatherCheck(
[WeatherTrigger("Rome,IT", 0.1)] WeatherPayload req,
ILogger log)
{
var message = $"{req.CityName} [{req.CurrentTemperature}] at {req.Timestamp}";
log.LogWarning(message);
}

And its execution looks like this:

Now we'll analyze all the classes we need to complete the whole process.

  1. Let's start with TriggerConfigurationProvider:
[Extension("Weather")]
public class WeatherTriggerConfigProvider : IExtensionConfigProvider
{
// .... Field definitions (look at GitHub repo for the full code)

public WeatherTriggerConfigProvider(INameResolver nameResolver, ILoggerFactory loggerFactory, IWeatherService weatherService)
{
// .... dependency assignment (look at GitHub repo for the full code)
}

public void Initialize(ExtensionConfigContext context)
{
var triggerAttributeBindingRule = context.AddBindingRule<WeatherTriggerAttribute>();
triggerAttributeBindingRule.BindToTrigger<WeatherPayload>(
new WeatherTriggerBindingProvider(this._nameResolver, this._loggerFactory, this._weatherService));
}
}

WeatherTriggerConfigProvider is decorated with the Extension attribute to mark it as an extension, and the Initialize method is where we define our binding rule. In this case, add a binding rule using the trigger attribute and set the trigger binding with the BindToTrigger method. This method tells the runtime that we use the WeatherPayload class to take the trigger data and WeatherTriggerBindingProvider as the provider to generate the binding class.

  1. Once we define our configuration provider, we have to add it in the binding pipeline, and to do this we must use the startup method seen previously:
[assembly: WebJobsStartup(typeof(ExtensionsStartup))]
public class ExtensionsStartup : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
builder.AddExtension<WeatherTriggerConfigProvider>();
}
}

In a real application, the startup method contains a lot of code (for example, the registration of the services in the dependency resolver container or a more complex configuration of the extensions), and for this reason, generally, extension methods are implemented for the registration of individual extensions:

[assembly: WebJobsStartup(typeof(ExtensionsStartup))]
public class ExtensionsStartup : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
builder.UseWeatherTrigger();
// Other configurations
}
}

public static class WebJobBuilderExtensions
{
public static IWebJobsBuilder UseWeatherTrigger(this IWebJobsBuilder builder)
{
if (builder == null)
throw new NullReferenceException(nameof(builder));

builder.AddExtension<WeatherTriggerConfigProvider>();
// Add here all the configuration code for the extension

return builder;
}
}

You will notice that the constructor of the WeatherTriggerConfigProvider class has three parameters but there is no trace of these parameters in the registration (the AddExtension method). This is because you can take advantage of the type resolution natively supported by the WebJobs SDK. In this case, when the platform has to create an instance of WeatherTriggerConfigProvider, it will try to resolve the types automatically. The INameResolver (used to read the configuration) and ILoggerFactory types are registered automatically by the platform, while for the resolution of IWeatherService (our service for access to the OpenWeatherMap API), it is necessary to use the type registration:

builder.Services.AddTransient<IWeatherService, WeatherService>();
  1. The next class we create is the WeatherTriggerBindingProvider class:
public class WeatherTriggerBindingProvider : ITriggerBindingProvider
{
// .... Field definitions (look at GitHub repo for the full code)

public WeatherTriggerBindingProvider(INameResolver nameResolver,
ILoggerFactory loggerFactory, IWeatherService weatherService)
{
// .... dependency assignment (look at GitHub repo for the full code)
}

public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
// TryCreateAsync implementation
}

private string GetTriggerAttributeApiKey(WeatherTriggerAttribute triggerAttribute)
{
// retrieves the configuration for the trigger
}
}

In general, the implementation of TryCreateAsync will do the following:

    • Inspect ParameterInfo to see whether it has the specific binding attribute applied to it (in this case, WeatherTriggerAttribute)
    • Determine whether the Type parameter is supported by the binding. If so, the provider constructs and returns an ITriggerBinding instance (WeatherTriggerBinding in this case):
    public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
if (context is null)
throw new ArgumentNullException(nameof(context));

var parameter = context.Parameter;
var triggerAttribute = parameter.GetCustomAttribute<WeatherTriggerAttribute>(inherit: false);
if (triggerAttribute is null)
return Task.FromResult<ITriggerBinding>(null);

triggerAttribute.ApiKey = GetTriggerAttributeApiKey(triggerAttribute);

return Task.FromResult<ITriggerBinding>(
new WeatherTriggerBinding(parameter, _nameResolver, _weatherService, triggerAttribute));
}

In this method, you can also validate the triggerAttribute parameter or read values from app settings (using the nameResolver instance passed in the constructor): 

    private string GetTriggerAttributeApiKey(WeatherTriggerAttribute triggerAttribute)
{
if (string.IsNullOrEmpty(triggerAttribute.ApiKey))
{
var apiKey = _nameResolver.Resolve("Weather.ApiKey");
if (string.IsNullOrEmpty(apiKey))
throw new InvalidOperationException("ApiKey is mandatory");
return apiKey;
}
return triggerAttribute.ApiKey;
}

The TryCreateAsync method returns the instance of ITriggerBinding the runtime uses to create the listener. In our scenario, ITriggerBinding is implemented by the WeatherTriggerBinding class:

public class WeatherTriggerBinding : ITriggerBinding
{
public Type TriggerValueType => typeof(WeatherPayload);
public IReadOnlyDictionary<string, Type> BindingDataContract { get; } = new Dictionary<string, Type>();
// .... other Field definitions (look at GitHub repo for the full code)

public WeatherTriggerBinding(ParameterInfo parameter, INameResolver nameResolver, IWeatherService weatherService, WeatherTriggerAttribute attribute)
{
// .... dependency assignment (look at GitHub repo for the full code)
}

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
return Task.FromResult<ITriggerData>(new TriggerData(null, new Dictionary<string, object>()));
}

// This is the most important method of the class. It has the responsibility to create the listener for the function.
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
{
return Task.FromResult<IListener>(new WeatherTriggerListener(context.Executor, this._weatherService, this._attribute));
}

public ParameterDescriptor ToParameterDescriptor()
{
// look at GitHub repo for the full code
}
}

The ITriggerBinding interface exposes three methods and two properties.

The BindAsync and BindingDataContract pair allows you to create triggers that support binding expressions in the trigger parameters. You can see an example of binding expressions in the following snippet of code:

[FunctionName("ResizeImage")]
public static void Run(
[BlobTrigger("sample-images/{filename}")] Stream image,
[Blob("sample-images-sm/{filename}", FileAccess.Write)] Stream imageSmall,
string filename,
ILogger log)
{
log.LogInformation(
$"Blob trigger processing: {filename}");
// ...
}

The {filename} expression in the BlogTrigger parameter, is a binding expression. The BindingDataContract property exposes a dictionary (of string and Type) that enumerates all the allowed binding expressions for the trigger. If you look at the BlobTrigger implementation in the GitHub repository, you will find the following:

private static IReadOnlyDictionary<string, Type> CreateBindingDataContract(IBlobPathSource path)
{
var contract = new Dictionary<string, Type>(StringComparer.OrdinalIgnoreCase);
contract.Add("BlobTrigger", typeof(string));
contract.Add("Uri", typeof(Uri));
contract.Add("Properties", typeof(BlobProperties));
contract.Add("Metadata", typeof(IDictionary<string, string>));

IReadOnlyDictionary<string, Type> contractFromPath = path.CreateBindingDataContract();

if (contractFromPath != null)
{
foreach (KeyValuePair<string, Type> item in contractFromPath)
{
// In case of conflict, binding data from the value type overrides the built-in binding data above.
contract[item.Key] = item.Value;
}
}

return contract;
}

The BindAsync method is called by the runtime during the binding phase to convert the trigger parameters into an ITriggerData instance using the BindingDataContract seen previously.

You can find the implementation of BlobTrigger in the GitHub repository at  https://github.com/Azure/azure-webjobs-sdk.

In our sample, we don't support binding expressions, so we return an empty dictionary for the BindingDataContract property and a TriggerData instance without any data for the BindAsync method.

CreateListenerAsync is the most important method of the class: it is responsible for creating the listener for the function. The listener should be initialized with all the parameters mandatory for the function (in the previous sample, the weather service, and the attribute information) and with the ITriggerFunctionExecutor instance used to invoke the function inside the listener when it needs to.

The ToParameterDescriptor method has the responsibility of returning a ParameterDescriptor instance that describes the trigger parameter. The descriptor can be used by some tools to display parameter information on dashboards.

Finally, the TriggerValueType property defines the type of value returned by the trigger—in our sample, the WeatherPayload we use in the function signature.

  1. The last class we have to implement is the listener, the heart of the trigger process. It is responsible for checking whether the trigger event is fired (in our sample, if the selected city temperature increases by a configured threshold) and if it is, it executes the function:
public class WeatherTriggerListener : IListener
{
// .... Field definitions (look at GitHub repo for the full code)
private CancellationTokenSource _listenerStoppingTokenSource;
private Task _listenerTask;

public WeatherTriggerListener(ITriggeredFunctionExecutor executor,
IWeatherService weatherService, WeatherTriggerAttribute attribute)
{
// .... dependency assignment (look at GitHub repo for the full code)
}

public void Cancel()
{
// .....
}

public Task StartAsync(CancellationToken cancellationToken)
{
// .....
}

public async Task StopAsync(CancellationToken cancellationToken)
{
// .....
}
}

The IListener interface exposes only three methods that allow the runtime to manage the listener.

The Cancel and StopAsync methods are used by the runtime to close the listener (for example, because the function host needs to be moved to another server). The listener needs to manage the async pattern used for function invocation in the right way, and this is the reason why you find the CancellationToken in the parameter of the IListener interface methods:

public void Cancel()
{
StopAsync(CancellationToken.None).Wait();
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (_listenerTask == null)
return;
try
{
_listenerStoppingTokenSource.Cancel();
}
finally
{
await Task.WhenAny(_listenerTask, Task.Delay(Timeout.Infinite, cancellationToken));
}
}

The StartAsync method is used by the runtime, in the startup phase, to start the listener. The listener is like a game loop that periodically checks whether the trigger event is fired:

public Task StartAsync(CancellationToken cancellationToken)
{
try
{
_listenerStoppingTokenSource = new CancellationTokenSource();
var factory = new TaskFactory();
var token = _listenerStoppingTokenSource.Token;
_listenerTask = factory.StartNew(async () => await ListenerAction(token), token);
}
catch (Exception)
{
throw;
}
return _listenerTask.IsCompleted ? _listenerTask : Task.CompletedTask;
}

private async Task ListenerAction(CancellationToken token)
{
// this is the code that the listener use to check if the trigger is fired
}

The StartAsync method simply starts a task that is an infinite loop (closed by the CancellationToken cancellation request). Inside the loop, we use the weather service to retrieve the city weather information and if the city temperature increased by the threshold (which is our trigger event), we create the weather payload and execute the function using the ITriggeredFunctionExecutor instance passed from the trigger binding:

private async Task ListenerAction(CancellationToken token)
{
this._weatherService.ApiKey = this._attribute.ApiKey;
var cityData = new CityInfo();
double lastTemperature = 0;

while (!token.IsCancellationRequested)
{
// get weather info and check if the city temperature is over the threshold (look at GitHub repo for the full code)
// If the listener need to fire the function (the temperature is over the threshold) use the next statement to fire the function
await _executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = weatherPayload }, token);
await Task.Delay(TimeSpan.FromMinutes(1), token);
}
}

Remember that you have a listener for each function, so the scalability of the function depends on how you implement the trigger. The listener executes a function and only when it finishes, it can check if another trigger event has been fired. This is why Azure Functions should not have a long duration. If you know that your functions may have a long duration, you have to write the listener to achieve this goal (for example, you could create a new task that performs the long-duration operation by exploiting multithreading and not blocking the execution of the listener).