Enriching Login Attempts with Wazuh and AbuseIPDB

SOCFortress
6 min readFeb 4, 2022

--

Use Case

Remote system access allows IT professionals to quickly react and respond to application and system level issues in a timely manner. Often times this is accomplished via the SSH protocol. While convenient for legitimate users, a publicly exposed SSH port results in immediate excitement for an attacker.

As defenders, we need a way to detect known bad actors. Luckily, Wazuh, paired with a custom integration script, can be configured to send API requests to AbuseIPDB. Coupled with Wazuh rules, the InfoSec team will immediately be made aware when a known aggressive IP has attempted to SSH into one of their servers.

Wazuh Rule

We first want to create a rule that will create an alert when a non private IP (192.169.x.x,172.16.x.x,etc.) has attempted to log into our server. This allows us to distinguish malicious insiders and those attempting to gain access via the internet.

<group name="local,syslog,sshd,">
<rule id="100001" level="10">
<if_sid>5716</if_sid>
<match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
<description>sshd: Authentication failed from a public IP address $(srcip).</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>

The above “match” block specifies that we want to perform a REGEX search with Perl Compatible Regular Expressions. The “!” also tells the Wazuh engine to match on anything BUT these regex matches. Below we can see our newly created rule in action.

The above rule is what we will use to tell Wazuh when to make the API call out to AbuseIPDB.

AbuseIPDB Script

The beauty of Wazuh and all other Open Source tools is that we can create our own integrations. We will create a python script that takes the source IP that triggered our above rule, and send it to AbuseIPDB to get the IP’s reputation. We will store this script in the /var/ossec/integrations/ directory of our Wazuh Managers

#!/var/ossec/framework/python/bin/python3
# SOCFortress
# https://www.socfortress.co
# info@socfortress.co
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
try:
import requests
from requests.auth import HTTPBasicAuth
except Exception as e:
print("No module 'requests' found. Install: pip install requests")
sys.exit(1)
# Global varsdebug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)
def main(args):
debug("# Starting")
# Read args
alert_file_location = args[1]
apikey = args[2]
debug("# API Key")
debug(apikey)
debug("# File location")
debug(alert_file_location)
# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)
# Request AbuseIPDB info
msg = request_abuseipdb_info(json_alert,apikey)
# If positive match, send event to Wazuh Manager
if msg:
send_event(msg, json_alert["agent"])
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
print(msg)f = open(log_file,"a")
f.write(msg)
f.close()
def collect(data):
abuse_confidence_score = data['abuseConfidenceScore']
country_code = data['countryCode']
usage_type = data['usageType']
isp = data['isp']
domain = data['domain']
total_reports = data['totalReports']
last_reported_at = data['lastReportedAt']
return abuse_confidence_score, country_code, usage_type, isp, domain, total_reports, last_reported_at
def in_database(data, srcip):
result = data['totalReports']
if result == 0:
return False
return True
def query_api(srcip, apikey):
params = {'maxAgeInDays': '90', 'ipAddress': srcip,}
headers = {
"Accept-Encoding": "gzip, deflate",
'Accept': 'application/json',
"Key": apikey
}
response = requests.get('https://api.abuseipdb.com/api/v2/check',params=params, headers=headers)
if response.status_code == 200:
json_response = response.json()
data = json_response["data"]
return data
else:
alert_output = {}
alert_output["abuseipdb"] = {}
alert_output["integration"] = "custom-abuseipdb"
json_response = response.json()
debug("# Error: The AbuseIPDB encountered an error")
alert_output["abuseipdb"]["error"] = response.status_code
alert_output["abuseipdb"]["description"] = json_response["errors"][0]["detail"]
send_event(alert_output)
exit(0)
def request_abuseipdb_info(alert, apikey):
alert_output = {}
# If there is no source ip address present in the alert. Exit.
if not "srcip" in alert["data"]:
return(0)# Request info using AbuseIPDB API
data = query_api(alert["data"]["srcip"], apikey)
# Create alert
alert_output["abuseipdb"] = {}
alert_output["integration"] = "custom-abuseipdb"
alert_output["abuseipdb"]["found"] = 0
alert_output["abuseipdb"]["source"] = {}
alert_output["abuseipdb"]["source"]["alert_id"] = alert["id"]
alert_output["abuseipdb"]["source"]["rule"] = alert["rule"]["id"]
alert_output["abuseipdb"]["source"]["description"] = alert["rule"]["description"]
alert_output["abuseipdb"]["source"]["full_log"] = alert["full_log"]
alert_output["abuseipdb"]["source"]["srcip"] = alert["data"]["srcip"]
srcip = alert["data"]["srcip"]
# Check if AbuseIPDB has any info about the srcip
if in_database(data, srcip):
alert_output["abuseipdb"]["found"] = 1
# Info about the IP found in AbuseIPDB
if alert_output["abuseipdb"]["found"] == 1:
abuse_confidence_score, country_code, usage_type, isp, domain, total_reports, last_reported_at = collect(data)
# Populate JSON Output object with AbuseIPDB request
alert_output["abuseipdb"]["abuse_confidence_score"] = abuse_confidence_score
alert_output["abuseipdb"]["country_code"] = country_code
alert_output["abuseipdb"]["usage_type"] = usage_type
alert_output["abuseipdb"]["isp"] = isp
alert_output["abuseipdb"]["domain"] = domain
alert_output["abuseipdb"]["total_reports"] = total_reports
alert_output["abuseipdb"]["last_reported_at"] = last_reported_at
debug(alert_output)return(alert_output)def send_event(msg, agent = None):
if not agent or agent["id"] == "000":
string = '1:abuseipdb:{0}'.format(json.dumps(msg))
else:
string = '1:[{0}] ({1}) {2}->abuseipdb:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True
# Logging the call
f = open(log_file, 'a')
f.write(msg +'\n')
f.close()
if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)
# Main function
main(sys.argv)
except Exception as e:
debug(str(e))
raise

Save this script as “custom-abuseipdb.py”

Now create a symlink for the custom script in the same directory:

SYMLINK HAS NOW BEEN REMOVED, IF FOLLOWING OPENSECURE VIDEO SKIP THIS STEP AND CONTINUE TO THE “PROCEED” SECTION

#!/bin/sh
# SOCFortress
# https://www.socfortress.co
# info@socfortress.co
#
# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
WPYTHON_BIN="framework/python/bin/python3"SCRIPT_PATH_NAME="$0"DIR_NAME="$(cd $(dirname ${SCRIPT_PATH_NAME}); pwd -P)"
SCRIPT_NAME="$(basename ${SCRIPT_PATH_NAME})"
case ${DIR_NAME} in
*/active-response/bin | */wodles*)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/../..; pwd)"
fi
PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
;;
*/bin)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
fi
PYTHON_SCRIPT="${WAZUH_PATH}/framework/scripts/${SCRIPT_NAME}.py"
;;
*/integrations)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
fi
PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
;;
esac
${WAZUH_PATH}/${WPYTHON_BIN} ${PYTHON_SCRIPT} "$@"

Save this script as “custom-abuseipdb”

PROCEED

Change the owner, group and make both scripts executable with the below commands:

chmod 750 /var/ossec/integrations/custom-abuseipdb*
chown root:ossec /var/ossec/integrations/custom-abuseipdb*

Your below output should look like so:

Integration Configuration

The Wazuh manager configuration file “ossec.conf” needs to be modified with the integration block like below:

WE ARE NOW INCLUDING THE “.py” EXSTENSION TO THE “<name>” block

<integration>
<name>custom-abuseipdb.py</name>
<hook_url>https://api.abuseipdb.com/api/v2/check</hook_url>
<api_key><YOUR_ABUSEIPDB_API_KEY></api_key>
<level>10</level>
<rule_id>100001</rule_id>
<alert_format>json</alert_format>
</integration>

This instructs our Wazuh manager to make the API call out to AbuseIPDB anytime our rule id (100001), which we created above, is triggered. Make sure you replace the “api_key” block with our own. You can register for a free API key at https://www.abuseipdb.com/.

Make sure to restart your Wazuh Manager so that the new changes take place:

systemctl restart wazuh-manager

You can tail the /var/ossec/log/integrations.log file to see our new integration script being triggered:

Since our demo server is exposed to the Internet, we quickly rack up alerts :).

Capturing the AbuseIPDB Response

Lastly, we need to capture the response sent back to the Wazuh manager so that our defense team knows when a abusive IP has been spotted and has a confidence score above 0. This helps us filter out false positives.

<group name="local,syslog,sshd,">
<rule id="100002" level="10">
<field name="abuseipdb.abuse_confidence_score" type="pcre2" negate="yes">^0$</field>
<description>IP with $(abuseipdb.abuse_confidence_score)% confidence of abuse connected to your network.</description>
<group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
</rule>
</group>

Restart your Wazuh manager one last time and watch your new highly intelligent alerts come in.

systemctl restart wazuh-manager

Takeaway

In this post, we integrated the AbuseIPDB API with Wazuh to check if IP addresses that attempted to SSH into our servers are known to be malicious. Stay tuned for a future post where we will look to actively defend our servers by blocking these malicious IPs in real time via the Active Response feature.

Need Help?

The functionality discussed in this post, and so much more, are available via the SOCFortress platform. Let SOCFortress help you and your team keep your infrastructure secure.

Website: https://www.socfortress.co/

Platform Demo: https://www.socfortress.co/demo_access.html

--

--

SOCFortress

SOCFortress is a SaaS company that unifies Observability, Security Monitoring, Threat Intelligence and Security Orchestration, Automation, and Response (SOAR).