Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1714

Released: 

Problem

At runtime, application code can be initialized through InitableTask.init for the low-level API and InitiableFunction.init for the high-level API. These init functions have access to a TaskContext object which contains ways to access certain functionality provided by the Samza framework, such as tables and metrics. For the high-level API, applications can also define their own runtime context objects through the ContextManager.

...

  • ApplicationDescriptor needs to have an API to specify how to create application-defined context, and this description needs to be serializable so that it can be passed throughout all containers
  • Applications should be able to specify container-level context

Proposed Changes

  1. The API for initializing low-level and high-level applications will be updated to accept a new Context object type. This Context will have a structure which clarifies the scope of context objects and provides consistent functionality across low-level and high-level.
  2. New methods will be added to ApplicationDescriptor for accepting factories that will be used to create application-defined context objects.

...

  • getJobContext: framework-provided context at the job level
    • getJobName: Name of the job
    • getJobId: Id for the instance of the job
    • getConfig: Configuration for the job
  • getContainerContext: framework-provided context at the container level
    • getContainerMetricsRegistry: used to register application metrics at the container level
    • getContainerModel: metadata about the container and its tasks
      • getProcessorId: id for this container
      • getTasks: mapping from task name to task model for the tasks on this container
  • getTaskContext: framework-provided context at the task level
    • getTaskMetricsRegistry: used to register application metrics at the task level
    • getStore(storeName): key-value store corresponding to the storeName
    • getTable(tableId): Table corresponding to the tableId
    • getTaskModel: metadata about the task
      • getTaskName: name of the task
      • getSystemStreamPartitions: get all of the SystemStreamPartitions for this task
      • getChangelogPartition: get the partition that is used for changelogs for this task
    • getSchedulergetCallbackScheduler: used to register/delete scheduled callbacks
    • setStartingOffset(systemStreamPartition, offset): set the offset to start processing at for a given systemStreamPartition
  • getApplicationContainerContext: application-provided context at the container level (needs to be casted to concrete type after accessed by application code)
  • getApplicationTaskContext: application-provided context at the task level (needs to be casted to concrete type after accessed by application code)

...

Code Block
titleApplicationDescriptor
public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
  ...

  S withApplicationContainerContextFactory(ApplicationContainerContextFactoryApplicationContainerContextFactory<?> factory);
  S withApplicationTaskContextFactory(ApplicationTaskContextFactoryApplicationTaskContextFactory<?> factory);
}

Implementation and Test Plan

...

Application context lifecycles

For ApplicationDefinedTaskContextApplicationContainerContext, the start() method will be called after before the init() methods of the Initable objects are called and before the container enters the run loop. The stop() method will be called after the run loop exits and before the close() methods of the Closable objects are called.

For ApplicationDefinedContainerContextApplicationTaskContext, the start() method will be called after the init() methods of the Initable objects are called and before ApplicationDefinedTaskContext.start(). the container enters the run loop. The stop() method will be called after ApplicationDefinedTaskContext.stop()the run loop exits and before the close() methods of the Closable objects are called.

Compatibility, Deprecation, and Migration Plan

...