-
Notifications
You must be signed in to change notification settings - Fork 435
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create MACO extractors that wrap the CAPE extractor (#2373)
* Create a AgentTesla MACO extractor that wraps the CAPE extractor * Add MACO extractors with tests * Create MACO extractors that wrap the CAPE extractor * Add maco dependency * Patch yara rule name * Update poetry lockfile * Run Ruff
- Loading branch information
Showing
98 changed files
with
2,943 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import os | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.AgentTesla import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict) -> MACOModel: | ||
if not raw_config: | ||
return | ||
|
||
protocol = raw_config.get('Protocol') | ||
if not protocol: | ||
return | ||
|
||
parsed_result = MACOModel(family="AgentTesla", other=raw_config) | ||
if protocol == "Telegram": | ||
parsed_result.http.append( | ||
MACOModel.Http(uri=raw_config["C2"], | ||
password=raw_config["Password"], | ||
usage="c2") | ||
) | ||
|
||
elif protocol in ["HTTP(S)", "Discord"]: | ||
parsed_result.http.append( | ||
MACOModel.Http(uri=raw_config["C2"], | ||
usage="c2") | ||
) | ||
|
||
elif protocol == "FTP": | ||
parsed_result.ftp.append( | ||
MACOModel.FTP(username=raw_config["Username"], | ||
password=raw_config["Password"], | ||
hostname=raw_config["C2"].replace('ftp://', ''), | ||
usage="c2") | ||
) | ||
|
||
elif protocol == "SMTP": | ||
smtp = dict(username=raw_config["Username"], | ||
password=raw_config["Password"], | ||
hostname=raw_config["C2"], | ||
mail_to=[raw_config["EmailTo"]], | ||
usage="c2") | ||
if "Port" in raw_config: | ||
smtp["port"] = raw_config["Port"] | ||
parsed_result.smtp.append(MACOModel.SMTP(**smtp)) | ||
|
||
if "Persistence_Filename" in raw_config: | ||
parsed_result.paths.append(MACOModel.Path(path=raw_config["Persistence_Filename"], usage="storage")) | ||
|
||
if "ExternalIPCheckServices" in raw_config: | ||
for service in raw_config["ExternalIPCheckServices"]: | ||
parsed_result.http.append(MACOModel.Http(uri=service, usage="other")) | ||
|
||
|
||
return parsed_result | ||
|
||
class AgentTesla(Extractor): | ||
author = "kevoreilly" | ||
family = "AgentTesla" | ||
last_modified = "2024-10-20" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = open(os.path.join(os.path.dirname(__file__).split('/modules', 1)[0], f"data/yara/CAPE/{family}.yar")).read() | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import os | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.AsyncRAT import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict) -> MACOModel: | ||
if not raw_config: | ||
return | ||
|
||
parsed_result = MACOModel(family="AsyncRAT", other=raw_config) | ||
|
||
# Mutex | ||
parsed_result.mutex.append(raw_config["Mutex"]) | ||
|
||
# Version | ||
parsed_result.version = raw_config["Version"] | ||
|
||
# Was persistence enabled? | ||
if raw_config['Install'] == 'true': | ||
parsed_result.capability_enabled.append('persistence') | ||
else: | ||
parsed_result.capability_disabled.append('persistence') | ||
|
||
# Installation Path | ||
if raw_config.get('Folder'): | ||
parsed_result.paths.append(MACOModel.Path(path=os.path.join(raw_config['Folder'], raw_config['Filename']), | ||
usage="install")) | ||
|
||
# C2s | ||
for i in range(len(raw_config.get('C2s', []))): | ||
parsed_result.http.append(MACOModel.Http(hostname=raw_config["C2s"][i], | ||
port=int(raw_config["Ports"][i]), | ||
usage="c2")) | ||
# Pastebin | ||
if raw_config.get("Pastebin") not in ["null", None]: | ||
# TODO: Is it used to download the C2 information if not embedded? | ||
# Ref: https://www.netskope.com/blog/asyncrat-using-fully-undetected-downloader | ||
parsed_result.http.append(MACOModel.Http(uri=raw_config["Pastebin"], | ||
usage="download")) | ||
|
||
return parsed_result | ||
|
||
class AsyncRAT(Extractor): | ||
author = "kevoreilly" | ||
family = "AsyncRAT" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = open(os.path.join(os.path.dirname(__file__).split('/modules', 1)[0], f"data/yara/CAPE/{family}.yar")).read() | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import os | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.AuroraStealer import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="AuroraStealer", other=raw_config) | ||
if raw_config.get('C2'): | ||
# IP related to C2 | ||
parsed_result.http.append(MACOModel.Http(hostname=raw_config['C2'], | ||
usage="c2")) | ||
|
||
return parsed_result | ||
|
||
class AuroraStealer(Extractor): | ||
author = "kevoreilly" | ||
family = "AuroraStealer" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = open(os.path.join(os.path.dirname(__file__).split('/modules', 1)[0], f"data/yara/CAPE/{family}.yar")).read() | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.Azorult import extract_config, rule_source | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
return MACOModel(family="Azorult", http=[MACOModel.Http(hostname=raw_config["address"])], other=raw_config) | ||
|
||
|
||
class Azorult(Extractor): | ||
author = "kevoreilly" | ||
family = "Azorult" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = rule_source | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.BackOffLoader import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="BackOffLoader", other=raw_config) | ||
|
||
# Version | ||
parsed_result.version = raw_config['Version'] | ||
|
||
# Encryption details | ||
parsed_result.encryption.append(MACOModel.Encryption(algorithm="rc4", | ||
key=raw_config['EncryptionKey'], | ||
seed=raw_config['RC4Seed'])) | ||
for url in raw_config['URLs']: | ||
parsed_result.http.append(MACOModel.Http(url=url)) | ||
|
||
return parsed_result | ||
|
||
|
||
class BackOffLoader(Extractor): | ||
author = "kevoreilly" | ||
family = "BackOffLoader" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.BackOffPOS import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="BackOffPOS", other=raw_config) | ||
|
||
# Version | ||
parsed_result.version = raw_config['Version'] | ||
|
||
# Encryption details | ||
parsed_result.encryption.append(MACOModel.Encryption(algorithm="rc4", | ||
key=raw_config['EncryptionKey'], | ||
seed=raw_config['RC4Seed'])) | ||
for url in raw_config['URLs']: | ||
parsed_result.http.append(MACOModel.Http(url=url)) | ||
|
||
return parsed_result | ||
|
||
|
||
class BackOffPOS(Extractor): | ||
author = "kevoreilly" | ||
family = "BackOffPOS" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.BitPaymer import extract_config, rule_source | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="BitPaymer", other=raw_config) | ||
|
||
# Extracted strings | ||
parsed_result.decoded_strings = raw_config["strings"] | ||
|
||
# Encryption details | ||
parsed_result.encryption.append(MACOModel.Encryption(algorithm="rsa", | ||
public_key=raw_config['RSA public key'])) | ||
return parsed_result | ||
|
||
class BitPaymer(Extractor): | ||
author = "kevoreilly" | ||
family = "BitPaymer" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = rule_source | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import os | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.BlackDropper import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="BlackDropper", campaign_id=[raw_config["campaign"]], other=raw_config) | ||
|
||
for dir in raw_config.get("directories", []): | ||
parsed_result.paths.append(MACOModel.Path(path=dir)) | ||
|
||
for url in raw_config.get("urls", []): | ||
parsed_result.http.append(MACOModel.Http(uri=url)) | ||
|
||
return parsed_result | ||
|
||
class BlackDropper(Extractor): | ||
author = "kevoreilly" | ||
family = "BlackDropper" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = open(os.path.join(os.path.dirname(__file__).split('/modules', 1)[0], f"data/yara/CAPE/{family}.yar")).read() | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import os | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.BlackNix import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="BlackNix", other=raw_config) | ||
|
||
# Mutex | ||
parsed_result.mutex.append(raw_config['Mutex']) | ||
|
||
# Capabilities that are enabled/disabled | ||
# TODO: Review if these are all capabilities set by a boolean flag | ||
for capa in ["Anti Sandboxie", "Kernel Mode Unhooking", "User Mode Unhooking", | ||
"Melt Server", "Offline Screen Capture", "Offline Keylogger", "Copy to ADS", | ||
"Safe Mode Startup", "Inject winlogon.exe", "Active X Run", "Registry Run"]: | ||
if raw_config[capa].lower() == "true": | ||
parsed_result.capability_enabled.append(capa) | ||
else: | ||
parsed_result.capability_disabled.append(capa) | ||
|
||
# Delay Time | ||
parsed_result.sleep_delay = raw_config["Delay Time"] | ||
|
||
# Password | ||
parsed_result.password.append(raw_config["Password"]) | ||
|
||
# C2 Domain | ||
parsed_result.http.append(MACOModel.Http(hostname=raw_config['Domain'], | ||
usage="c2")) | ||
# Registry | ||
parsed_result.registry.append(MACOModel.Registry(key=raw_config["Registry Key"])) | ||
|
||
# Install Path | ||
parsed_result.paths.append(MACOModel.Path(path=os.path.join(raw_config['Install Path'], raw_config["Install Name"]), | ||
usage="install")) | ||
|
||
# Campaign Group/Name | ||
parsed_result.campaign_id = [raw_config["Campaign Name"], raw_config["Campaign Group"]] | ||
return parsed_result | ||
|
||
|
||
class BlackNix(Extractor): | ||
author = "kevoreilly" | ||
family = "BlackNix" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import os | ||
from maco.model import ExtractorModel as MACOModel | ||
from maco.extractor import Extractor | ||
from modules.processing.parsers.CAPE.Blister import extract_config | ||
|
||
def convert_to_MACO(raw_config: dict): | ||
if not raw_config: | ||
return None | ||
|
||
parsed_result = MACOModel(family="Blister", other=raw_config) | ||
|
||
for capa in ["Persistence", "Sleep after injection"]: | ||
if raw_config[capa]: | ||
parsed_result.capability_enabled.append(capa) | ||
else: | ||
parsed_result.capability_disabled.append(capa) | ||
|
||
# Rabbit encryption | ||
parsed_result.encryption.append(MACOModel.Encryption(algorithm="rabbit", | ||
key=raw_config["Rabbit key"], | ||
iv=raw_config["Rabbit IV"])) | ||
return parsed_result | ||
|
||
class Blister(Extractor): | ||
author = "kevoreilly" | ||
family = "Blister" | ||
last_modified = "2024-10-26" | ||
sharing = "TLP:CLEAR" | ||
yara_rule = open(os.path.join(os.path.dirname(__file__).split('/modules', 1)[0], f"data/yara/CAPE/{family}.yar")).read() | ||
|
||
def run(self, stream, matches): | ||
return convert_to_MACO(extract_config(stream.read())) |
Oops, something went wrong.