12 minute read

Overview

We recently worked with a client that wanted to replace their Splunk Heavy Forwarder with a Cribl Stream worker. One of the challenges was that we needed to replace functionality of the Tenable Add-On for Splunk that pulls scan, asset and plugin data from the Tenable SC appliance. This document outlines the steps needed to setup that integration to act as a one-for-one replacement of the Splunk app inputs.

Configuring Line Breakers

The first step to setup the collectors is to create custom line breakers for the vulnerabilities, assets and plugins. When making requests to Tenable, the API server responds with a large JSON object with the data embedded in the JSON. The line breakers are slightly different for each type of requested data. These will instruct Cribl how to parse the data out of the JSON and assign appropriate timestamps to the logs.

  1. In the Cribl Worker Group pages, go to Processing -> Knowledge.
  2. Select Event Breaker Rules from the menu on the left.
  3. Click Add Ruleset for each of the types below. Assign the ID to be the name of each group title below. Then create a single rule in each, using the settings provided below.

Tenable JSON Basic Ruleset

  • Name = Parse Results
  • Filter condition = true
  • Event Breaker Settings – Enabled = On
  • Event Breaker type = JSON Array
  • Array field = response.results
  • JSON extract fields = On
  • Timestamp field = lastSeen
  • Max event bytes = 5000000
  • Timestamp anchor = ^
  • Timestamp format = Manual format - %s

Tenable JSON Basic Ruleset screen

Tenable JSON Asset Ruleset

  • Name = Parse Results
  • Filter condition = true
  • Event Breaker Settings – Enabled = On
  • Event Breaker type = JSON Array
  • Array field = response.results
  • JSON extract fields = On
  • Timestamp field = (leave empty)
  • Max event bytes = 5000
  • Timestamp anchor = ^
  • Timestamp format = Current time

Tenable JSON Asset Ruleset screen

Tenable JSON Plugins Ruleset

  • Name = Parse Results
  • Filter condition = true
  • Event Breaker Settings – Enabled = On
  • Event Breaker type = JSON Array
  • Array field = response
  • JSON extract fields = On
  • Timestamp field = modifiedTime
  • Max event bytes = 300000
  • Timestamp anchor = ^
  • Timestamp format = Manual format - %s

Tenable JSON Plugins Ruleset screen

Storing the Tenable API Host in Variables

As we create the Tenable ingests, we will reference the Tenable API endpoint in URLs to connect to. We will also add a host field to every event tagging it with the API host name. Given the repetitive nature of this data, we setup a variable in Cribl to track this and then reference that variable name in all the configurations. This also makes it easier to change later if the API server moves.

  1. From the Cribl Worker Group configuration pages, go to Processing -> Knowledge.
  2. Click on the Variables menu on the left side of the page.
  3. Click the Add Variable button.
  4. Set the following values and click Save.
    • Name = tenableScHost
    • Description = Hostname for the Tenable SC server – used in collector source definitions.
    • Type = string
    • Value = (Set the value to be the IP or DNS name of the Tenable API server to connect to.)

Creating the tenableScHost variable

Storing the Tenable Credentials as a Cribl Secret

We need to securely store the access and secret key needed to authentication to the Tenable API server. These will be stored as secrets in Cribl.

  1. From the Cribl Worker Group configuration pages, go to Group Settings.
  2. Click on the Security menu on the left side of the page.
  3. Click on the Secrets menu.
  4. Click the Add Secret button.
  5. Set the following values and click Save.
    • Secret name = Tenable_SC_Access_Key
    • Secret type = API key and secret key
    • Description = Access Tenable SC appliance
    • API key = (paste API key value)
    • Secret key = (paste secret key value)

Creating the credential secret

Configuring REST Collectors

With the line breakers, variables and credentials defined, we then create data sources to pull each of the data sets from Tenable. In the Data Sources page for a Cribl Worker Group, you will look for the Collectors REST type source and create new sources for each of the ingests below.

tenable_sc_vuln

Extracts the open and reopened vulnerabilities

  • Collect URL = `https://${C.vars.tenableScHost}/rest/analysis`
  • Collect method = POST with Body
  • Collect POST body (NOTE: The filter against the lastSeen field is configured to use a state variable to track the last ingested date. This will get defined during the scheduling of this collector. If the state has not been previously set, it will default to the date 7 days prior to the current date.):
{
    "type": "vuln",
    "sourceType": "cumulative",
    "sortField": "lastSeen",
    "sortDir": "asc",
    "query": {
        "tool": "vulndetails",
        "type": "vuln",
        "filters": [
            {"filterName": "lastSeen", "operator": "=", "value": `${(state.since + 1) || (Math.floor((Date.now() - (7 * 24 * 60 * 60 * 1000))/1000))}-${Math.floor((Date.now())/1000)}`},
            {"filterName": "wasVuln", "operator": "=", "value": "excludeWas"}
        ],
        "startOffset": (+__e['response.endOffset'] || 0),
        "endOffset": ((+__e['response.endOffset'] || 0) + 200)
    }
}
  • Collect headers
    • Name: x-apikey, Value: `accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').* apiKey}; secretkey=${C.Secret('Tenable_SC_Access_Key', 'keypair').* secretKey};`
  • Pagination = Response Body Attribute
  • Response attributes:
    • response.endOffset
    • response.totalRecords
  • Page limit = 0
  • Last-page expression = (+__e['response.endOffset']) >= (+__e* ['response.totalRecords'])
  • Event Breaker rulesets – Select the Tenable JSON Basic Ruleset
  • Fields:
    • Field Name: host, Value: C.vars.tenableScHost

Collector overview for tenable_sc_vuln

Results overview for tenable_sc_vuln

tenable_sc_vuln_patched

Retrieves the vulnerabilities that have been patched.

  • Collect URL = `https://${C.vars.tenableScHost}/rest/analysis`
  • Collect method = POST with Body
  • Collect POST body (NOTE: The filter against the lastMitigated field is configured to use a state variable to track the last ingested date. This will get defined during the scheduling of this collector. If the state has not been previously set, it will default to the date 7 days prior to the current date.):
{
    "type": "vuln",
    "sourceType": "patched",
    "sortField": "lastSeen",
    "sortDir": "asc",
    "query": {
        "tool": "vulndetails",
        "type": "vuln",
        "filters": [
            {"filterName": "lastMitigated", "operator": "=", "value": `${(state.since + 1) || (Math.floor((Date.now() - (7 * 24 * 60 * 60 * 1000))/1000))}-${Math.floor((Date.now())/1000)}`},
            {"filterName": "wasVuln", "operator": "=", "value": "excludeWas"}
        ],
        "startOffset": (+__e['response.endOffset'] || 0),
        "endOffset": ((+__e['response.endOffset'] || 0) + 200)
    }
}
  • Collect headers
    • Name: x-apikey, Value: `accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').apiKey}; secretkey=$* {C.Secret('Tenable_SC_Access_Key', 'keypair').secretKey};`
  • Pagination = Response Body Attribute
  • Response attributes:
    • response.endOffset
    • response.totalRecords
  • Page limit = 0
  • Last-page expression = (+__e['response.endOffset']) >= (+__e['response.totalRecords'])
  • Event Breaker rulesets – Select the Tenable JSON Basic Ruleset
  • Fields:
    • Field Name: host, Value: C.vars.tenableScHost
    • Field Name: state, Value: 'fixed'

Collector overview for tenable_sc_vuln_patched

Results overview for tenable_sc_vuln_patched

tenable_sc_asset

Retrieves the aggregated asset data tracked by Tenable.

  • Collect URL = `https://${C.vars.tenableScHost}/rest/analysis`
  • Collect method = POST with Body
  • Collect POST body (NOTE: The filter against the lastSeen field is configured to use a state variable to track the last ingested date. This will get defined during the scheduling of this collector. If the state has not been previously set, it will default to the date 7 days prior to the current date.):
{
    "type": "vuln",
    "sourceType": "cumulative",
    "sortField": "lastSeen",
    "sortDir": "asc",
    "query": {
        "tool": "sumip",
        "type": "vuln",
        "filters": [
            {"filterName": "lastSeen", "operator": "=", "value": `${Math.floor(state.since || ((Date.now() - (7 * 24 * 60 * 60 * 1000))/1000))}-${Math.floor((Date.now())/1000)}`},
            {"filterName": "wasVuln", "operator": "=", "value": "excludeWas"}
        ],
        "startOffset": (+__e['response.endOffset'] || 0),
        "endOffset": ((+__e['response.endOffset'] || 0) + 200)
    }
}
  • Collect headers
    • Name: x-apikey, Value: `accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').apiKey}; secretkey=$* {C.Secret('Tenable_SC_Access_Key', 'keypair').secretKey};`
  • Pagination = Response Body Attribute
  • Response attributes:
    • response.endOffset
    • response.totalRecords
  • Page limit = 0
  • Last-page expression = (+__e['response.endOffset']) >= (+__e['response.totalRecords'])
  • Event Breaker rulesets – Select the Tenable JSON Asset Ruleset
  • Fields:
    • Field Name: host, Value: C.vars.tenableScHost

Collector overview for tenable_asset_plugin

Results overview for tenable_asset_plugin

tenable_sc_plugin

Retrieves details about the plugin definitions as they are updated.

  • Collect URL = `https://${C.vars.tenableScHost}/rest/plugin`
  • Collect method = GET
  • Collect parameters:
    • Name: startOffset, Value: 0
    • Name: endOffset, Value: 10000
    • Name: fields, Value: 'id,name,description,family,type,copyright,version,sourceFile,dependencies, requiredPorts,requiredUDPPorts,cpe,srcPort,dstPort,protocol,riskFactor,solution,seeAlso, synopsis,checkType,exploitEase,exploitAvailable,exploitFrameworks,cvssVector,cvssVectorBF, baseScore,temporalScore,stigSeverity,pluginPubDate,pluginModDate,patchPubDate,patchModDate, vulnPubDate,modifiedTime,md5,xrefs,vprScore,vprContext'
    • Name: since, Value: `${state.since || 0}`
    • Name: sortField, Value: 'modifiedTime'
    • Name: sortDirection, Value: 'ASC'
  • Collect headers
    • Name: x-apikey, Value: `accesskey=${C.Secret('Tenable_SC_Access_Key', 'keypair').apiKey}; secretkey=${C.Secret* ('Tenable_SC_Access_Key', 'keypair').secretKey};`
  • Pagination = None
  • Event Breaker rulesets – Select the Tenable JSON Plugins Ruleset
  • Fields:
    • Field Name: host, Value: C.vars.tenableScHost

Collector overview for tenable_sc_plugin

Results overview for tenable_sc_plugin

Scheduling Collectors

With the four collectors defined, we need to setup schedules to periodically make the REST calls to Tenable and pull the data. The schedules also define the logic for setting up state variables. These variables track dates from previous runs that are incremented so that Cribl doesn’t duplicate data on future runs.

From your list of REST collector sources, you will click on the Schedule button to define each of the schedules below. NOTE: The cron schedules are arbitrary and can be adjusted to meet your needs. They are designed to not run on the quarter hours which are popular times to run jobs:

tenable_sc_vuln

  • Enabled = On
  • Cron schedule = 25 * * * *
  • Concurrent run limit = 1
  • Skippable = On
  • Mode = Full Run
  • Time range = Relative
  • Earliest = (leave blank)
  • Latest = (leave blank)
  • State Tracking – Enabled = On
  • State update expression = __timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
  • State merge expression = (prevState.since || 0) > newState.since ? prevState : newState

Schedule for tenable_sc_vuln

Schedule state for tenable_sc_vuln

tenable_sc_vuln_patched

  • Enabled = On
  • Cron schedule = 25 * * * *
  • Concurrent run limit = 1
  • Skippable = On
  • Mode = Full Run
  • Time range = Relative
  • Earliest = (leave blank)
  • Latest = (leave blank)
  • State Tracking – Enabled = On
  • State update expression = __timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
  • State merge expression = (prevState.since || 0) > newState.since ? prevState : newState

Schedule for tenable_sc_vuln_patched

Schedule state for tenable_sc_vuln_patched

tenable_sc_asset

  • Enabled = On
  • Cron schedule = 47 * * * *
  • Concurrent run limit = 1
  • Skippable = On
  • Mode = Full Run
  • Time range = Relative
  • Earliest = (leave blank)
  • Latest = (leave blank)
  • State Tracking – Enabled = On
  • State update expression = __timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
  • State merge expression = (prevState.since || 0) > newState.since ? prevState : newState

Schedule for tenable_sc_asset

Schedule state for tenable_sc_asset

tenable_sc_plugin

  • Enabled = On
  • Cron schedule = 39 */4 * * *
  • Concurrent run limit = 1
  • Skippable = On
  • Mode = Full Run
  • Time range = Relative
  • Earliest = (leave blank)
  • Latest = (leave blank)
  • State Tracking – Enabled = On
  • State update expression = __timestampExtracted !== false && {since: (state.since || 0) > _time ? state.since : _time}
  • State merge expression = (prevState.since || 0) > newState.since ? prevState : newState

Schedule for tenable_sc_plugin

Schedule state for tenable_sc_plugin

Processing Tenable Data in Pipelines

Finally, we need to do some processing of this data as it passes through Cribl. The Splunk TA for Tenable has some logic to manipulate the data before storing it in Splunk. The following pipelines are designed to replicate those changes. Create a new pipeline for each section and then create the functions per the instructions given.

tenable_sc_vuln

The first pipeline is for processing logs from the tenable_sc_vuln and tenable_sc_vuln_patched collectors.

Eval Function – The first function in the pipeline does a number of manipulations of fields on the event. This includes additional parsing, hard coding information and changing values to a human readable string.

Name Value Expression
_uniqueness uniqueness.split(',') || []
SC_address host || 'tenable_sc'
organization (This should match the customer’s organization info. Example provided here.) {'description':'', 'id':'1', 'name':'Acme Corporation', 'uuid':'A123BC4D-1234-5678-9012-1AB2CDE34EA1'}
custom_severity Boolean(Number.parseInt(recastRisk) || 0)
acceptRisk Boolean(Number.parseInt(acceptRisk) || 0)
hasBeenMitigated Boolean(Number.parseInt(hasBeenMitigated) || 0)
recastRisk Boolean(Number.parseInt(recastRisk) || 0)
severity_id severity['id'] || ''
vendor_severity severity['name'] || ''
severity_description severity['description'] || ''
severity ['informational','low','medium','high','critical'][Number.parseInt(severity_id)] || ''
plugin_id pluginID || ''
state state || (hasBeenMitigated ? 'reopened' : 'open')

Eval statements for the tenable_sc_vuln

Code Function – The Splunk TA generates a field called sc_uniqueness. This is based on the fields that Tenable provided in the uniqueness field, which we split into an array and stored in _uniqueness. This code function walks through each of those field names and attempts to pull the value from the event. It then creates an underscore separated value with those field values.

__e['sc_uniqueness'] = (__e['_uniqueness'] || [])
.map(key => key=='repositoryID' ? (__e['repository']['id'] || '') : (__e[key] || ''))
.join('_');

Code function for the tenable_sc_vuln

Serialize Function – This function takes all the data changes and rewrites the raw event as a new JSON representation of the data.

  • Type = JSON Object
  • Fields to serialize:
    • !_*
    • !host
    • !source
    • !sourcetype
    • !index
    • !cribl*
    • *
  • Destination field = _raw

Serialize function for the tenable_sc_vuln

Eval Function – The final eval function sets all the meta data that we need for Splunk and removes all other fields.

Name Value Expression
index 'tenable' (Be sure to set this to the Splunk index for your environment)
sourcetype 'tenable:sc:vuln'
source `cribl:${__collectible.collectorId}`
  • Keep fields:
    • _raw
    • _time
    • host
    • index
    • source
    • sourcetype
  • Remove fields:
    • *

Final eval statements for the tenable_sc_vuln

tenable_sc_asset

The next pipeline is for processing logs from the tenable_sc_asset collector.

Eval Function – The first function in the pipeline does a minimal amount of field manipulation of the data.

Name Value Expression
_uniqueness uniqueness.split(',') || []
SC_address host || 'tenable_sc'
organization (This should match the customer’s organization info. Example provided here.) {'description':'', 'id':'1', 'name':'Acme Corporation', 'uuid':'A123BC4D-1234-5678-9012-1AB2CDE34EA1'}

Code Function – This pipeline uses the same code function as above to create the sc_uniqueness field.

Serialize Function – This pipeline uses the same Serialize function as above to rewrite the _raw event.

Eval Function – Finally, we set the meta for Splunk.

Name Value Expression
index 'tenable' (Be sure to set this to the Splunk index for your environment)
sourcetype 'tenable:sc:assets'
source `cribl:${__collectible.collectorId}`
  • Keep fields:
    • _raw
    • _time
    • host
    • index
    • source
    • sourcetype
  • Remove fields:
    • *

Final eval statements for the tenable_sc_asset

tenable_sc_plugin

The final pipeline is for processing logs from the tenable_sc_plugin collector.

Eval Function – The first function in the pipeline does a minimal amount of field manipulation of the data.

Name Value Expression
_uniqueness uniqueness.split(',') || []
SC_address host || 'tenable_sc'
organization (This should match the customer’s organization info. Example provided here.) {'description':'', 'id':'1', 'name':'Acme Corporation', 'uuid':'A123BC4D-1234-5678-9012-1AB2CDE34EA1'}

Code Function – This pipeline uses the same code function as above to create the sc_uniqueness field.

Serialize Function – This pipeline uses the same Serialize function as above to rewrite the _raw event.

Eval Function – Finally, we set the meta for Splunk.

Name Value Expression
index 'tenable' (Be sure to set this to the Splunk index for your environment)
sourcetype 'tenable:sc:plugin'
source `cribl:${__collectible.collectorId}`
  • Keep fields:
    • _raw
    • _time
    • host
    • index
    • source
    • sourcetype
  • Remove fields:
    • *

Final eval statements for the tenable_sc_plugin

Create Your Routes

With all of the infrastructure above setup, you can finish your implementation by creating routes that connect your new collectors to Splunk, using the pipelines defined above.

Tenable API References

Updated: