Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create onepassword_ssh_key plugin #9580

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/BOTMETA.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ files:
$lookups/onepassword_raw.py:
ignore: scottsb
maintainers: azenk
$lookups/onepassword_ssh_key.py:
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
maintainers: mohammedbabelly20
$lookups/passwordstore.py: {}
$lookups/random_pet.py:
maintainers: Akasurde
Expand Down
124 changes: 124 additions & 0 deletions plugins/lookup/onepassword_ssh_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2025, Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

DOCUMENTATION = """
name: onepassword_ssh_key
author:
- Mohammed Babelly (@mohammedbabelly20)
requirements:
- C(op) 1Password command line utility version 2 or later.
short_description: Fetch SSH keys stored in 1Password
version_added: "10.3.0"
description:
- P(community.general.onepassword_ssh_key#lookup) wraps C(op) command line utility to fetch SSH keys from 1Password.
notes:
- By default, it returns the private key value in PKCS#8 format, unless O(ssh_format=true) is passed.
- The pluging works only for C(SSHKEY) type items.
- This plugin requires C(op) version 2 or later.

options:
_terms:
description: Identifier(s) (case-insensitive UUID or name) of item(s) to retrieve.
required: true
type: list
elements: string
ssh_format:
description: Output key in SSH format if V(true). Otherwise, outputs in the default format (PKCS#8).
default: false
type: bool

extends_documentation_fragment:
- community.general.onepassword
- community.general.onepassword.lookup
"""

EXAMPLES = """
- name: Retrieve the private SSH key from 1Password
ansible.builtin.debug:
msg: "{{ lookup('community.general.onepassword_ssh_key', 'SSH Key', ssh_format=true) }}"
"""

RETURN = """
_raw:
description: Private key of SSH keypair.
type: list
elements: string
"""
import json

from ansible_collections.community.general.plugins.lookup.onepassword import (
OnePass,
OnePassCLIv2,
)
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
from ansible.errors import AnsibleLookupError
from ansible.plugins.lookup import LookupBase


class OnePassCLIv2SSHKey(OnePassCLIv2):

def get_ssh_key(self, item_id, vault=None, token=None, ssh_format=False):
rc, out, err = self.get_raw(item_id, vault=vault, token=token)

data = json.loads(out)

if data.get("category") != "SSH_KEY":
raise AnsibleLookupError(f"Item {item_id} is not an SSH key")

private_key_field = next(
(
field
for field in data.get("fields", {})
if field.get("id") == "private_key" and field.get("type") == "SSHKEY"
),
None,
)
if not private_key_field:
raise AnsibleLookupError(f"No private key found for item {item_id}.")

if ssh_format:
return (
private_key_field.get("ssh_formats", {})
.get("openssh", {})
.get("value", "")
)
return private_key_field.get("value", "")


class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
self.set_options(var_options=variables, direct=kwargs)

ssh_format = self.get_option("ssh_format")
vault = self.get_option("vault")
subdomain = self.get_option("subdomain")
domain = self.get_option("domain", "1password.com")
username = self.get_option("username")
secret_key = self.get_option("secret_key")
master_password = self.get_option("master_password")
service_account_token = self.get_option("service_account_token")
account_id = self.get_option("account_id")
connect_host = self.get_option("connect_host")
connect_token = self.get_option("connect_token")

op = OnePass(
subdomain=subdomain,
domain=domain,
username=username,
secret_key=secret_key,
master_password=master_password,
service_account_token=service_account_token,
account_id=account_id,
connect_host=connect_host,
connect_token=connect_token,
cli_class=OnePassCLIv2SSHKey,
)
op.assert_logged_in()

return [
op._cli.get_ssh_key(term, vault, token=op.token, ssh_format=ssh_format)
for term in terms
]
36 changes: 36 additions & 0 deletions tests/unit/plugins/lookup/onepassword_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,39 @@ def load_file(file):
},
],
}

SSH_KEY_MOCK_ENTRIES = [
# loads private key in PKCS#8 format by default
{
"vault_name": "Personal",
"queries": ["ssh key"],
"expected": [
"-----BEGIN PRIVATE KEY-----\n..........=\n-----END PRIVATE KEY-----\n"
],
"output": load_file("ssh_key_output.json"),
},
# loads private key in PKCS#8 format becasue ssh_format=false
{
"vault_name": "Personal",
"queries": ["ssh key"],
"kwargs": {
"ssh_format": False,
},
"expected": [
"-----BEGIN PRIVATE KEY-----\n..........=\n-----END PRIVATE KEY-----\n"
],
"output": load_file("ssh_key_output.json"),
},
# loads private key in ssh format
{
"vault_name": "Personal",
"queries": ["ssh key"],
"kwargs": {
"ssh_format": True,
},
"expected": [
"-----BEGIN OPENSSH PRIVATE KEY-----\r\n.....\r\n-----END OPENSSH PRIVATE KEY-----\r\n"
],
"output": load_file("ssh_key_output.json"),
},
]
57 changes: 57 additions & 0 deletions tests/unit/plugins/lookup/onepassword_fixtures/ssh_key_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"id": "wdtryfeh3jlx2dlanqgg4dqxmy",
"title": "ssh key",
"version": 1,
"vault": {
"id": "5auhrjy66hc7ndhe2wvym6gadv",
"name": "Personal"
},
"category": "SSH_KEY",
"last_edited_by": "LSGPJERUYBH7BFPHMZ2KKGL6AU",
"created_at": "2025-01-10T16:57:16Z",
"updated_at": "2025-01-10T16:57:16Z",
"additional_information": "SHA256:frHmQAgblahD5HHgNj2O714",
"fields": [
{
"id": "public_key",
"type": "STRING",
"label": "public key",
"value": "ssh-ed255.....",
"reference": "op://Personal/ssh key/public key"
},
{
"id": "fingerprint",
"type": "STRING",
"label": "fingerprint",
"value": "SHA256:frHmQAgy7zBKeFDxHMW0QltZ/5O4N8gD5HHgNj2O614",
"reference": "op://Personal/ssh key/fingerprint"
},
{
"id": "private_key",
"type": "SSHKEY",
"label": "private key",
"value": "-----BEGIN PRIVATE KEY-----\n..........=\n-----END PRIVATE KEY-----\n",
"reference": "op://Personal/ssh key/private key",
"ssh_formats": {
"openssh": {
"reference": "op://Personal/ssh key/private key?ssh-format=openssh",
"value": "-----BEGIN OPENSSH PRIVATE KEY-----\r\n.....\r\n-----END OPENSSH PRIVATE KEY-----\r\n"
}
}
},
{
"id": "key_type",
"type": "STRING",
"label": "key type",
"value": "ed25519",
"reference": "op://Personal/ssh key/key type"
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"reference": "op://Personal/ssh key/notesPlain"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: 2025, Ansible Project
30 changes: 30 additions & 0 deletions tests/unit/plugins/lookup/test_onepassword_ssh_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2025 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import json
import pytest

from .onepassword_common import SSH_KEY_MOCK_ENTRIES

from ansible.plugins.loader import lookup_loader


@pytest.mark.parametrize(
("vault", "queries", "kwargs", "output", "expected"),
(
(item["vault_name"], item["queries"], item.get("kwargs", {}), item["output"], item["expected"])
for item in SSH_KEY_MOCK_ENTRIES
)
)
def test_ssh_key(mocker, vault, queries, kwargs, output, expected):
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePass.assert_logged_in", return_value=True)
mocker.patch("ansible_collections.community.general.plugins.lookup.onepassword.OnePassCLIBase._run", return_value=(0, json.dumps(output), ""))

op_lookup = lookup_loader.get("community.general.onepassword_ssh_key")
result = op_lookup.run(queries, vault=vault, **kwargs)

assert result == expected
Loading