README.md 9.14 KB
Newer Older
1
2
3
4
5
6
7
# Reports Aggregation

Counts statuses of reports belonging to one session and step and propagates a
final result
to [Import API](https://gitlab.switch.ch/memoriav/memobase-2020/services/import-process/import-api)
.

8
## Setup
9

10
11
12
13
14
15
16
The following settings should be applied to the service via environment
variables:

* `APPLICATION_ID`: ID used for identifying the application inside the Kafka
  broker
* `TOPIC_IN`: Kafka input topic (i.e. the topic where the reports are written
  to)
17
18
19
* `IMPORT_API_DOMAIN`: Domain name (without scheme) of the Import API endpoint
  where the aggregations are sent to
* `IMPORT_API_PORT`: Port of the Import API
20
21
22
23
24
25
* `ELAPSED_TIME_MULTIPLIER`: Factor with which the maximal elapsed stream time
  between two reports in one aggregation is multiplied (optional, defaults to
  `"1. 0"`)
* `MINIMAL_INACTIVITY_GAP`: Minimal duration in milliseconds in which an
  aggregation hasn't collected any new reports before being considered as
  "inactive" (optional, defaults to `"0"`)
26
27
28
* `MAXIMAL_INACTIVITY_GAP`: Maximal duration in milliseconds in which an
  aggregation hasn't collected any new reports before being considered as
  "inactive" (optional, not set per default)
29
30
* `GRACE_PERIOD`: Duration in milliseconds after which an inactive aggregation
  is eligible to be garbage collected (optional, defaults to `"3600000"`)
31
32
33
34
* `INCREMENTAL_UPDATE_CHUNK_SIZE`: Number processed reports after which the
  aggregation is sent downstream. Used to provide a size-based incremental
  update to the consumers of the aggregation (optional, per default no
  size-based updates are sent the consumers)
35
36
37
38
39
40
41
* `STREAM_TIME`: The notion of stream time which is used for reports. This value
  can be one of `eventtime`, `reporttime`, and `processingtime`. See below for
  more information (optional, defaults to `"processingtime"`)
* `PUNCTUATOR_INTERVAL`: Interval in milliseconds in which the wall clock
  punctuator is active (optional, defaults to `"10000"`)
* `STEPS`: Comma separated list of step names for which aggregations are made

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
42
43
44
45
46
**Important**: A Kafka processor node is generated for every step listed in the 
`STEPS` environment variable. Because these processor nodes are built during the
startup phase of the application, every change in the `STEPS` variable requires a
restart of the application.

47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
## General considerations

### Event-based aggregations

This service groups reports (_events_) generated by the services (_steps_) in
the Memobase import workflow by _session_ (a specific import) and step. Its aim
is to aggregate the respective statuses and propagate the finished aggregation
back to the user who initialised the import.

Unfortunately there is no certainty when an aggregation is whole, since the
service feeds on an event stream which is per definition unbounded. Thus, the
service has to make some assumptions on when the aggregation could be considered
as complete. A secure strategy would be to wait indefinitely, which is of course
contrary to the goal to inform a user when the counting
(aggregation) has finished. So it makes sense to define a so-called
_inactivity gap_. This means the time that must be at least pass in which no new
report belonging to a certain step-session-couple has arrived. The value for
this inactivity gap shouldn't be too high (-> the user must wait for a too long
time to see the results) and too low (-> the aggregation isn't yet finished).

The service offers configurations which provide a balance between "too risky"
(or lossy) and "too slow" (or conservative). On the one hand, you can define
a `MINIMAL_ACTIVITY_GAP` which marks the lower time boundary for sending (or
_flushing_) an aggregation. On the other hand, you can leverage
`ELAPSED_TIME_MULTIPLIER` to bring in a more dynamic computing of an adequate
inactivity gap. This is especially important when events come in irregularly (
e.g. when phases of heavy and light load on the import process interchange). The
value is a factor with which the highest difference between the timestamps of
two reports is multiplied to determine the final inactivity gap. This of course
doesn't help much when a sharp rise in the load occurs
(e.g. event1 -> 532ms -> event2 -> 4323ms -> event3), but should accommodate
more relaxed cases.

However, the case where late-arriving reports occur can't just be neglected. To
catch it the service introduces a "versioned" message passing. A first-time
flushed aggregation starts as version one and receives a unique identifier. If
reports for this aggregation still arrive after that point in time, they are
added to the aggregation as well. When the inactivity gap is surpassed once
again, a new version (version `2`) of the aggregation is created. It contains
the __overall__ count of report statuses belonging to that aggregation.
Furthermore, it provides a field (`previousMessageId`)
that has the identifier of the previously issued message. With all this it is
possible for a downstream service to "update" an aggregation if needs be.

91
92
93
94
95
96
97
The same approach is used if an `INCREMENTAL_UPDATE_CHUNK_SIZE` is defined,
which means that after the defined number of processed reports a snapshot of the
aggregation is sent to the consumers. This happens regardless of the set
inactivity gap.

Finally, there is a hard limit on retaining aggregations in the memory:
`GRACE_PERIOD`, defining the time (in milliseconds) after which an aggregation
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
is deleted in the service. If even after that period a report for the same
step-session-couple arrives, a new aggregation is created,
__which is independent from the previous one__ (i.e. the aggregation won't ever
on the whole import).

### Time

Because the service acts on events which are generated somewhere in a more or
less distant past, the so-called stream time rather than the "normal" _wall
clock time_ is used to timestamp the events. The actual timestamp can be based
on different metrics (set via the `STREAM_TIME` environment variable):

* `eventtime`: The original timestamp of the event (normally added by the the
  producer at the very beginning of the import workflow)
* `reporttime`: The timestamp which is added to the respective message by the
  service who produced the report
* `processingtime`: The time when the reports have arrived at this service

Each event arriving at the service advances the stream time to its respective
timestamp. It is important to note that this stream time isn't congruent at all
to the _wall clock time_, neither in the sense of the respective epoch time nor
in the sense of the its progressing. In fact, the stream time normally advances
much faster than the _wall clock time_.

To illustrate the point, let's think of an event stream which goes two weeks
back and this service, which only just connected to the respective topic. Let's
further assume that in this two weeks period 10'000 events have happened. The
processing of these events takes the service just a couple of seconds, depending
on the total workload in the cluster. So the wall clock just advanced a little
bit, while the stream time progressed much, much faster, running through two
weeks in real time in the twinkling of an eye.

The service normally uses the stream time to decide on flushing and deleting
aggregations. There is however a small disadvantage when using stream time for
this purpose. It won't advance if no new events happen. If the service entirely
relied on the stream time, the last aggregation would be frozen until new events
arrive. This is not acceptable, and therefore another approach is used besides
the stream time one. In regular intervals (where the regularity is based on the
wall clock time) a scheduler kicks in and performs checks for inactive or
expired aggregations. The interval isn't that important (apart from the fact
that it should be too long to avoid unneeded latency), but can nevertheless be
set by the `PUNCTUATOR_INTERVAL` variable (value in milliseconds).

## Output
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

The service produces a JSON object containing the following values:

* `sessionId`: The unique id of the import process session
* `step`: Name of the respective step
* `total`: Total of analysed reports
* `success`: Total of analysed reports with status `SUCCESS`
* `ignore`: Total of analysed reports with status `IGNORE`
* `warning`: Total of analysed reports with status `WARNING`
* `fatal`: Total of analysed reports with status `FATAL`
* `earliest`: Timestamp of earliest message (event time)
* `latest`: Timestamp of latest message (event time)
* `elapsedTime`: Duration between earliest and latest message
* `recordSetId`: RecordSet's ID
* `institutionId`: Institution's ID
157
158
159
160
* `messageId`: Unique message ID. Changes with each new version
* `previousMessageId`: ID of previous message. Empty if no predecessor exists
* `messageVersion`: Message version, beginning with `1` and increasing by
  `1` at a time
161

162
### Example
163
164
165
166
167
168
169
170
171
172
173
174
175
176

```json
{
  "sessionId": "550e8400-e29b-11d4-a716-446655440000",
  "step": "media-metadata-extractor",
  "total": 468,
  "success": 413,
  "ignore": 0,
  "warning": 455,
  "fatal": 0,
  "earliest": "2021-03-08T14:02:23.232",
  "latest": "2021-03-08:T14:05:32.451",
  "elapsedTime": "00:03:11.219",
  "recordSetId": "fss-001",
177
178
179
180
  "institutionId": "fss",
  "messageId": "1b7d8224-451a-4afb-b5d2-6537acb74051",
  "previousMessageId": "04543477-3bb5-4d01-a318-52ded1ce5e1c",
  "version": 3
181
}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
182
```