Author
Landon Lewis
Field CTO
In my last post How to Use Axiom with OPNsense logs I showed how to ingest and query OPNsense events in Axiom. Because Axiom alleviates the compromises other platforms make you take to keep costs down as your logging volume scales, it presents the opportunity to enrich these data sources and provide additional insights.
One enrichment example is to take open-source threat intelligence from a code repository like this one which formats the data into a structure useful for comparison and then outputs it if a match is found. There are advantages to near real-time impacts or enriching data, versus scheduled searches or historical analysis. Like David Bianco’s “Pyramid of Pain” the type and quality of the intelligence matters, and each could be substituted for your own Detection Engineering efforts.
The point here is to demonstrate that volumetric data sources — even with additional data added — can prove valuable if you’re making compromises on retaining and storing event data that you may otherwise archive, drop, or only analyze when more critical events deem it necessary.
We start off by using Vector as a syslog aggregator, or in some cases we use their socket type and send syslog payloads there as many products violate confirming to true syslog standards like RFC 3164 and RFC 5424. Vector makes uses of “sources” as the method getting data into the system. You can use any port, but I picked something that ended in 514 (the default syslog port). Here is the code I’m using for that specific section:
# Syslog source
[sources.syslog]
type = "Syslog"
mode = "udp"
address = "0.0.0.0:6514"
# Syslog socket source
[sources.socket]
type = "socket"
mode = "udp"
address = "0.0.0.0:10514"
Next we define an enrichment table with the “values of interest,” which we will compare to our ingested source data. It also defines the encoding type and the (four) fields that we’d like to reference in the schema of that CSV file. This table points to a CSV file that we create with a small Python script. The script supports five different types of data, but this example only demonstrates two in use.
# File source for enrichment data
[enrichment_tables.data]
type = "file"
file.path = "/opt/axiom/data.csv"
file.encoding.type = "csv"
schema.indicator = "string"
schema.source = "string"
schema.type = "string"
schema.indicator_type = "string"
indicator,source,type,indicator_type
1.116.156.226,DRB-RA,C2,IP
accomodation-tastes.net,CPS-CTI,Amnesty_NSO_Domains,Domain
http://112.238.10.54:55242/i,ABUSE-CH,URLHAUS,URL
63de2b6188d5694e79b678f585b13264,SalesForce,JA3,Hash
orguk8244@gmail.com,CPS-CTI,PHISHING,Email
Next we need to define the logic that does the comparison in Vector Remap Language (VRL). We start by filtering by the application name (appname), which are the filterlog and unbound logs. We then define how those messages are delimited and where the fields exist for each event in the processed map. Lastly, we set up the “get_enrichment_table_record”, add error handling, and specify what we would like to be added to the message when there is a match.
For IP address matches I add a field “threat_id” to a value of “yes” and pull in the source and type as well. If one had access to more attributable data a variable, or hyperlink to a Threat Intelligence Platform (TIP) search capability could be used here.
[transforms.remap]
type = "remap"
inputs = ["Syslog"]
source = '''
if .appname == "filterlog" {
filter_parts, err = split(.message, ",")
# Extract IPs from the filter_parts
.source_ip = filter_parts[18]
.destination_ip = filter_parts[19]
# For destination IP enrichment
destination_ip_enriched, err = get_enrichment_table_record("data", { "indicator": to_string(.destination_ip) })
if err == null && destination_ip_enriched.indicator_type == "IP" {
.threat_id = "yes"
.threat.source = destination_ip_enriched.source
.threat.type = destination_ip_enriched.type
}
}
if .appname == "unbound" {
domain_parts, err = split(.message, " ")
# Extract domain from the domain_parts
.domain = domain_parts[2]
if exists(.domain) && !is_null(.domain) && ends_with(to_string(.domain), ".") {
sliced_domain, err = slice(.domain, 0, -1)
if err == null {
.domain = sliced_domain
} else {
# Handle the error case, such as logging the error, or setting a default value, etc.
log("Failed to slice the domain: " + err)
}
}
if exists(.domain) {
domain_string, err = to_string(.domain)
if err == null {
domain_enriched, err = get_enrichment_table_record("data", { "indicator": domain_string })
if err == null && domain_enriched.indicator_type == "Domain" {
.threat_id = "yes"
.threat.source = domain_enriched.source
.threat.type = domain_enriched.type
}
}
}
}
'''
Now, within Axiom we can use APL to write a simple query to return the results of matches.