Skip to content

Commit

Permalink
source-zendesk-support: checkpoint during AuditLogs stream
Browse files Browse the repository at this point in the history
For tasks that have a significant number of audit logs in Zendesk
Support, it can take more than 24 hours to backfill. With the current
`AuditLogs` stream strategy of reading records in descending order then
stopping when we've reached a record we already read, we can't
checkpoint until the backfill is completed. This means tasks that take
more than 24 hours to backfill `AuditLogs` can get stuck restarting &
attempting to backfill.

This PR updates the `AuditLogs` stream to read records in ascending
order. This lets the connector checkpoint after each response & pick up
where it left off whenever it's restarted.
Some noteable changes include:
- Passing a list containing `start_time` and `end_time` for the
  `filter[created_at][]` query param to bound the responses' results to
  the specified timespan.
- Pushing `end_time` 30 seconds in the past to avoid missing records
  if we query Zendesk in the middle of creating multiple records with
  the same `created_at`.
- Keeping `AuditLogs` on the older `get_updated_state` method of
  updating state instead of migrating to the newer `state` property.
  This is to avoid rebackfilling all existing tasks.
  • Loading branch information
Alex-Bair committed Oct 21, 2024
1 parent 9aa980f commit c65157a
Showing 1 changed file with 16 additions and 36 deletions.
52 changes: 16 additions & 36 deletions source-zendesk-support/source_zendesk_support/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,54 +754,34 @@ class AuditLogs(SourceZendeskSupportCursorPaginationStream):
This endpoint does not respect the start_time param. It requires two query params with the same name to filter by date.
See https://support.zendesk.com/hc/en-us/community/posts/4859612547866-Audit-Log-API-error.
Instead, we mimic the TicketAudits stream strategy of not paginating further when the most recent page contains results we don't need.
"""

response_list_name: str = "audit_logs"
# audit_logs doesn't have the 'updated_by' field
cursor_field = "created_at"
state_checkpoint_interval = 100

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = {"page[size]": self.page_size}
def request_params(self, next_page_token: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None,**kwargs) -> MutableMapping[str, Any]:
cursor_value = stream_state.get(self.cursor_field, None)
start_time = cursor_value or self._start_date
# The end_time is moved a little in the past to avoid missing records that share the same "created_at" since
# records with the current cursor value are ignored.
end_time = (datetime.now(tz=UTC) - timedelta(seconds=30)).strftime(DATETIME_FORMAT)

params = {
"page[size]": self.page_size,
# By default, results are returned in descending order.
# "sort" is required to get results returned in ascending order.
"sort": "created_at",
# "filter[created_at][]" filters the responses' results to only records created within the specified timespan.
"filter[created_at][]": [start_time, end_time]
}

if next_page_token:
params.update(next_page_token)

return params

def _is_before_last_cursor_date(self, response: requests.Response, stream_state: Mapping[str, Any]) -> bool:
"""
This method checks whether a response contains documents before the last cursor date (if it exists) or thg start date.
This allows us to determine when to stop paginating backwards.
"""
document = response.json().get(self.response_list_name, [{}])[0]
document_created_at = document.get(self.cursor_field, "")
cursor_date = (stream_state or {}).get(self.cursor_field) or self._start_date
return document_created_at < cursor_date

# Same as airbyte_cdk's HttpStream._read_pages method, but adds a condition to stop paginating
# if the response contains documents created before our last cursor date / start date.
def _read_pages(
self,
records_generator_fn: Callable[
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Optional[Mapping[str, Any]]], Iterable[StreamData]
],
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
while not pagination_complete:
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
yield from records_generator_fn(request, response, stream_state, stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token or self._is_before_last_cursor_date(response, stream_state):
pagination_complete = True

# Return an empty generator in case no records are yielded
yield from []

class Users(SourceZendeskSupportIncrementalCursorExportStream):
"""Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export-cursor-based"""
Expand Down

0 comments on commit c65157a

Please sign in to comment.