Ingest Tenable SC Logs using Cribl Collectors
- Overview
- Configuring Line Breakers
- Storing the Tenable API Host in Variables
- Storing the Tenable Credentials as a Cribl Secret
- Configuring REST Collectors
- Scheduling Collectors
- Processing Tenable Data in Pipelines
- Create Your Routes
- Tenable API References
Overview
We recently worked with a client that wanted to replace their Splunk Heavy Forwarder with a Cribl Stream worker. One of the challenges was that we needed to replace functionality of the Tenable Add-On for Splunk that pulls scan, asset and plugin data from the Tenable SC appliance. This document outlines the steps needed to setup that integration to act as a one-for-one replacement of the Splunk app inputs.
Configuring Line Breakers
The first step to setup the collectors is to create custom line breakers for the vulnerabilities, assets and plugins. When making requests to Tenable, the API server responds with a large JSON object with the data embedded in the JSON. The line breakers are slightly different for each type of requested data. These will instruct Cribl how to parse the data out of the JSON and assign appropriate timestamps to the logs.
- In the Cribl Worker Group pages, go to Processing -> Knowledge.
- Select Event Breaker Rules from the menu on the left.
- Click Add Ruleset for each of the types below. Assign the ID to be the name of each group title below. Then create a single rule in each, using the settings provided below.
Tenable JSON Basic Ruleset
- Name =
Parse Results
- Filter condition =
true
- Event Breaker Settings – Enabled = On
- Event Breaker type = JSON Array
- Array field =
response.results
- JSON extract fields = On
- Timestamp field =
lastSeen
- Max event bytes =
5000000
- Timestamp anchor =
^
- Timestamp format = Manual format -
%s
Tenable JSON Asset Ruleset
- Name =
Parse Results
- Filter condition =
true
- Event Breaker Settings – Enabled = On
- Event Breaker type = JSON Array
- Array field =
response.results
- JSON extract fields = On
- Timestamp field = (leave empty)
- Max event bytes =
5000
- Timestamp anchor =
^
- Timestamp format = Current time
Tenable JSON Plugins Ruleset
- Name =
Parse Results
- Filter condition =
true
- Event Breaker Settings – Enabled = On
- Event Breaker type = JSON Array
- Array field =
response
- JSON extract fields = On
- Timestamp field =
modifiedTime
- Max event bytes =
300000
- Timestamp anchor =
^
- Timestamp format = Manual format -
%s
Storing the Tenable API Host in Variables
As we create the Tenable ingests, we will reference the Tenable API endpoint in URLs to connect to. We will also add a host field to every event tagging it with the API host name. Given the repetitive nature of this data, we setup a variable in Cribl to track this and then reference that variable name in all the configurations. This also makes it easier to change later if the API server moves.
- From the Cribl Worker Group configuration pages, go to Processing -> Knowledge.
- Click on the Variables menu on the left side of the page.
- Click the Add Variable button.
- Set the following values and click Save.
- Name =
tenableScHost
- Description =
Hostname for the Tenable SC server – used in collector source definitions.
- Type = string
- Value = (Set the value to be the IP or DNS name of the Tenable API server to connect to.)
- Name =
Storing the Tenable Credentials as a Cribl Secret
We need to securely store the access and secret key needed to authentication to the Tenable API server. These will be stored as secrets in Cribl.
- From the Cribl Worker Group configuration pages, go to Group Settings.
- Click on the Security menu on the left side of the page.
- Click on the Secrets menu.
- Click the Add Secret button.
- Set the following values and click Save.
- Secret name =
Tenable_SC_Access_Key
- Secret type = API key and secret key
- Description =
Access Tenable SC appliance
- API key = (paste API key value)
- Secret key = (paste secret key value)
- Secret name =
Configuring REST Collectors
With the line breakers, variables and credentials defined, we then create data sources to pull each of the data sets from Tenable. In the Data Sources page for a Cribl Worker Group, you will look for the Collectors REST type source and create new sources for each of the ingests below.
tenable_sc_vuln
Extracts the open and reopened vulnerabilities
- Collect URL =
`https://${C.vars.tenableScHost}/rest/analysis`
- Collect method = POST with Body
- Collect POST body (NOTE: The filter against the lastSeen field is configured to use a state variable to track the last ingested date. This will get defined during the scheduling of this collector. If the state has not been previously set, it will default to the date 7 days prior to the current date.):
{
"type": "vuln",
"sourceType": "cumulative",
"sortField": "lastSeen",
"sortDir": "asc",
"query": {
"tool": "vulndetails",
"type": "vuln",
"filters": [
{"filterName": "lastSeen", "operator": "=", "value": `${(state.since + 1) || (Math.floor((Date.now() - (7 * 24 * 60 * 60 * 1000))/1000))}-${Math.floor((Date.now())/1000)}`},
{"filterName": "wasVuln", "operator": "=", "value": "excludeWas"}
],
"startOffset": (+__e['response.endOffset'] || 0),
"endOffset": ((+__e['response.endOffset'] || 0) + 200)
}
}
- Collect headers
- Name:
x-apikey
, Value:`accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').* apiKey}; secretkey=${C.Secret('Tenable_SC_Access_Key', 'keypair').* secretKey};`
- Name:
- Pagination = Response Body Attribute
- Response attributes:
- response.endOffset
- response.totalRecords
- Page limit =
0
- Last-page expression =
(+__e['response.endOffset']) >= (+__e* ['response.totalRecords'])
- Event Breaker rulesets – Select the Tenable JSON Basic Ruleset
- Fields:
- Field Name:
host
, Value:C.vars.tenableScHost
- Field Name:
tenable_sc_vuln_patched
Retrieves the vulnerabilities that have been patched.
- Collect URL =
`https://${C.vars.tenableScHost}/rest/analysis`
- Collect method = POST with Body
- Collect POST body (NOTE: The filter against the lastMitigated field is configured to use a state variable to track the last ingested date. This will get defined during the scheduling of this collector. If the state has not been previously set, it will default to the date 7 days prior to the current date.):
{
"type": "vuln",
"sourceType": "patched",
"sortField": "lastSeen",
"sortDir": "asc",
"query": {
"tool": "vulndetails",
"type": "vuln",
"filters": [
{"filterName": "lastMitigated", "operator": "=", "value": `${(state.since + 1) || (Math.floor((Date.now() - (7 * 24 * 60 * 60 * 1000))/1000))}-${Math.floor((Date.now())/1000)}`},
{"filterName": "wasVuln", "operator": "=", "value": "excludeWas"}
],
"startOffset": (+__e['response.endOffset'] || 0),
"endOffset": ((+__e['response.endOffset'] || 0) + 200)
}
}
- Collect headers
- Name:
x-apikey
, Value:`accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').apiKey}; secretkey=$* {C.Secret('Tenable_SC_Access_Key', 'keypair').secretKey};`
- Name:
- Pagination = Response Body Attribute
- Response attributes:
- response.endOffset
- response.totalRecords
- Page limit =
0
- Last-page expression =
(+__e['response.endOffset']) >= (+__e['response.totalRecords'])
- Event Breaker rulesets – Select the Tenable JSON Basic Ruleset
- Fields:
- Field Name:
host
, Value:C.vars.tenableScHost
- Field Name:
state
, Value:'fixed'
- Field Name:
tenable_sc_asset
Retrieves the aggregated asset data tracked by Tenable.
- Collect URL =
`https://${C.vars.tenableScHost}/rest/analysis`
- Collect method = POST with Body
- Collect POST body (NOTE: The filter against the lastSeen field is configured to use a state variable to track the last ingested date. This will get defined during the scheduling of this collector. If the state has not been previously set, it will default to the date 7 days prior to the current date.):
{
"type": "vuln",
"sourceType": "cumulative",
"sortField": "lastSeen",
"sortDir": "asc",
"query": {
"tool": "sumip",
"type": "vuln",
"filters": [
{"filterName": "lastSeen", "operator": "=", "value": `${Math.floor(state.since || ((Date.now() - (7 * 24 * 60 * 60 * 1000))/1000))}-${Math.floor((Date.now())/1000)}`},
{"filterName": "wasVuln", "operator": "=", "value": "excludeWas"}
],
"startOffset": (+__e['response.endOffset'] || 0),
"endOffset": ((+__e['response.endOffset'] || 0) + 200)
}
}
- Collect headers
- Name:
x-apikey
, Value:`accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').apiKey}; secretkey=$* {C.Secret('Tenable_SC_Access_Key', 'keypair').secretKey};`
- Name:
- Pagination = Response Body Attribute
- Response attributes:
- response.endOffset
- response.totalRecords
- Page limit =
0
- Last-page expression =
(+__e['response.endOffset']) >= (+__e['response.totalRecords'])
- Event Breaker rulesets – Select the Tenable JSON Asset Ruleset
- Fields:
- Field Name:
host
, Value:C.vars.tenableScHost
- Field Name:
tenable_sc_plugin
Retrieves details about the plugin definitions as they are updated.
- Collect URL =
`https://${C.vars.tenableScHost}/rest/plugin`
- Collect method = GET
- Collect parameters:
- Name:
startOffset
, Value:0
- Name:
endOffset
, Value:10000
- Name:
fields
, Value:'id,name,description,family,type,copyright,version,sourceFile,dependencies, requiredPorts,requiredUDPPorts,cpe,srcPort,dstPort,protocol,riskFactor,solution,seeAlso, synopsis,checkType,exploitEase,exploitAvailable,exploitFrameworks,cvssVector,cvssVectorBF, baseScore,temporalScore,stigSeverity,pluginPubDate,pluginModDate,patchPubDate,patchModDate, vulnPubDate,modifiedTime,md5,xrefs,vprScore,vprContext'
- Name:
since
, Value:`${state.since || 0}`
- Name:
sortField
, Value:'modifiedTime'
- Name:
sortDirection
, Value:'ASC'
- Name:
- Collect headers
- Name:
x-apikey
, Value:`accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').apiKey}; secretkey=${C.Secret* ('Tenable_SC_Access_Key', 'keypair').secretKey};`
- Name:
- Pagination = None
- Event Breaker rulesets – Select the Tenable JSON Plugins Ruleset
- Fields:
- Field Name:
host
, Value:C.vars.tenableScHost
- Field Name:
Scheduling Collectors
With the four collectors defined, we need to setup schedules to periodically make the REST calls to Tenable and pull the data. The schedules also define the logic for setting up state variables. These variables track dates from previous runs that are incremented so that Cribl doesn’t duplicate data on future runs.
From your list of REST collector sources, you will click on the Schedule button to define each of the schedules below. NOTE: The cron schedules are arbitrary and can be adjusted to meet your needs. They are designed to not run on the quarter hours which are popular times to run jobs:
tenable_sc_vuln
- Enabled = On
- Cron schedule =
25 * * * *
- Concurrent run limit =
1
- Skippable = On
- Mode = Full Run
- Time range = Relative
- Earliest = (leave blank)
- Latest = (leave blank)
- State Tracking – Enabled = On
- State update expression =
__timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
- State merge expression =
(prevState.since || 0) > newState.since ? prevState : newState
tenable_sc_vuln_patched
- Enabled = On
- Cron schedule =
25 * * * *
- Concurrent run limit =
1
- Skippable = On
- Mode = Full Run
- Time range = Relative
- Earliest = (leave blank)
- Latest = (leave blank)
- State Tracking – Enabled = On
- State update expression =
__timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
- State merge expression =
(prevState.since || 0) > newState.since ? prevState : newState
tenable_sc_asset
- Enabled = On
- Cron schedule =
47 * * * *
- Concurrent run limit =
1
- Skippable = On
- Mode = Full Run
- Time range = Relative
- Earliest = (leave blank)
- Latest = (leave blank)
- State Tracking – Enabled = On
- State update expression =
__timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
- State merge expression =
(prevState.since || 0) > newState.since ? prevState : newState
tenable_sc_plugin
- Enabled = On
- Cron schedule =
39 */4 * * *
- Concurrent run limit =
1
- Skippable = On
- Mode = Full Run
- Time range = Relative
- Earliest = (leave blank)
- Latest = (leave blank)
- State Tracking – Enabled = On
- State update expression =
__timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
- State merge expression =
(prevState.since || 0) > newState.since ? prevState : newState
Processing Tenable Data in Pipelines
Finally, we need to do some processing of this data as it passes through Cribl. The Splunk TA for Tenable has some logic to manipulate the data before storing it in Splunk. The following pipelines are designed to replicate those changes. Create a new pipeline for each section and then create the functions per the instructions given.
tenable_sc_vuln
The first pipeline is for processing logs from the tenable_sc_vuln and tenable_sc_vuln_patched collectors.
Eval Function – The first function in the pipeline does a number of manipulations of fields on the event. This includes additional parsing, hard coding information and changing values to a human readable string.
Name | Value Expression |
_uniqueness | uniqueness.split(',') || [] |
SC_address | host || 'tenable_sc' |
organization (This should match the customer’s organization info. Example provided here.) | {'description':'', 'id':'1', 'name':'Acme Corporation', 'uuid':'A123BC4D-1234-5678-9012-1AB2CDE34EA1'} |
custom_severity | Boolean(Number.parseInt(recastRisk) || 0) |
acceptRisk | Boolean(Number.parseInt(acceptRisk) || 0) |
hasBeenMitigated | Boolean(Number.parseInt(hasBeenMitigated) || 0) |
recastRisk | Boolean(Number.parseInt(recastRisk) || 0) |
severity_id | severity['id'] || '' |
vendor_severity | severity['name'] || '' |
severity_description | severity['description'] || '' |
severity | ['informational','low','medium','high','critical'][Number.parseInt(severity_id)] || '' |
plugin_id | pluginID || '' |
state | state || (hasBeenMitigated ? 'reopened' : 'open') |
Code Function – The Splunk TA generates a field called sc_uniqueness
. This is based on the fields that Tenable provided in the uniqueness
field, which we split into an array and stored in _uniqueness
. This code function walks through each of those field names and attempts to pull the value from the event. It then creates an underscore separated value with those field values.
__e['sc_uniqueness'] = (__e['_uniqueness'] || [])
.map(key => key=='repositoryID' ? (__e['repository']['id'] || '') : (__e[key] || ''))
.join('_');
Serialize Function – This function takes all the data changes and rewrites the raw event as a new JSON representation of the data.
- Type = JSON Object
- Fields to serialize:
!_*
!host
!source
!sourcetype
!index
!cribl*
*
- Destination field =
_raw
Eval Function – The final eval function sets all the meta data that we need for Splunk and removes all other fields.
Name | Value Expression |
index | 'tenable' (Be sure to set this to the Splunk index for your environment) |
sourcetype | 'tenable:sc:vuln' |
source | `cribl:${__collectible.collectorId}` |
- Keep fields:
_raw
_time
host
index
source
sourcetype
- Remove fields:
*
tenable_sc_asset
The next pipeline is for processing logs from the tenable_sc_asset collector.
Eval Function – The first function in the pipeline does a minimal amount of field manipulation of the data.
Name | Value Expression |
_uniqueness | uniqueness.split(',') || [] |
SC_address | host || 'tenable_sc' |
organization (This should match the customer’s organization info. Example provided here.) | {'description':'', 'id':'1', 'name':'Acme Corporation', 'uuid':'A123BC4D-1234-5678-9012-1AB2CDE34EA1'} |
Code Function – This pipeline uses the same code function as above to create the sc_uniqueness field.
Serialize Function – This pipeline uses the same Serialize function as above to rewrite the _raw event.
Eval Function – Finally, we set the meta for Splunk.
Name | Value Expression |
index | 'tenable' (Be sure to set this to the Splunk index for your environment) |
sourcetype | 'tenable:sc:assets' |
source | `cribl:${__collectible.collectorId}` |
- Keep fields:
_raw
_time
host
index
source
sourcetype
- Remove fields:
*
tenable_sc_plugin
The final pipeline is for processing logs from the tenable_sc_plugin collector.
Eval Function – The first function in the pipeline does a minimal amount of field manipulation of the data.
Name | Value Expression |
_uniqueness | uniqueness.split(',') || [] |
SC_address | host || 'tenable_sc' |
organization (This should match the customer’s organization info. Example provided here.) | {'description':'', 'id':'1', 'name':'Acme Corporation', 'uuid':'A123BC4D-1234-5678-9012-1AB2CDE34EA1'} |
Code Function – This pipeline uses the same code function as above to create the sc_uniqueness field.
Serialize Function – This pipeline uses the same Serialize function as above to rewrite the _raw event.
Eval Function – Finally, we set the meta for Splunk.
Name | Value Expression |
index | 'tenable' (Be sure to set this to the Splunk index for your environment) |
sourcetype | 'tenable:sc:plugin' |
source | `cribl:${__collectible.collectorId}` |
- Keep fields:
_raw
_time
host
index
source
sourcetype
- Remove fields:
*
Create Your Routes
With all of the infrastructure above setup, you can finish your implementation by creating routes that connect your new collectors to Splunk, using the pipelines defined above.
Tenable API References
- Tenable Security Center API: Analysis - https://docs.tenable.com/security-center/api/Analysis.htm
- Tenable Security Center API: Plugin - https://docs.tenable.com/security-center/api/Plugin.htm
- Retrieve Vulnerability Data for a Specific Time Range - https://docs.tenable.com/security-center/best-practices/api/Content/RetrieveVulnerabilityDataForSpecificTimeRange.htm
- Retrieve Asset Data from Tenable Security Center - https://docs.tenable.com/security-center/best-practices/api/Content/RetrieveAssetDataFromSC.htm