PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation
In this tutorial, we build an advanced, Colab-ready workflow around PyGraphistry for interactive graph analytics and visualization. We start by creating a realistic enterprise-style access dataset, transforming it into nodes and edges, and enriching the graph with risk scores, anomaly indicators, centrality metrics, community detection, and layout embeddings. We then use PyGraphistry to bind graph structure, visual encodings, labels, tooltips, and filtered subgraphs, and to generate local interactive visualizations when Graphistry credentials are not configured. Through this implementation, we see how graph intelligence helps us investigate suspicious users, risky devices, IP relationships, sensitive services, and high-risk behavioral patterns in a practical security analytics setting.
Star us on GitHub for future Code notebooks and implementation
Installing PyGraphistry and Dependencies
import os, sys, subprocess, warnings, textwrap, json, math, random
warnings.filterwarnings("ignore")
def pip_install(packages):
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "-U", *packages], check=True)
pip_install([
"graphistry[networkx,umap-learn]",
"pandas",
"numpy",
"networkx",
"scikit-learn",
"pyvis",
"matplotlib",
"pyarrow"
])
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import graphistry
from pathlib import Path
from IPython.display import display, HTML, IFrame
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.decomposition import PCA
from pyvis.network import Network
OUT_DIR = Path("/content/pygraphistry_advanced_tutorial")
OUT_DIR.mkdir(parents=True, exist_ok=True)
SEED = 42
rng = np.random.default_rng(SEED)
random.seed(SEED)
print("=" * 100)
print("PyGraphistry Advanced Colab Tutorial")
print("=" * 100)
print("This tutorial builds an enterprise-style access graph, computes graph analytics,")
print("creates suspicious subgraphs, exports graph artifacts, and optionally uploads")
print("interactive visualizations to Graphistry Hub if credentials are available.")
print("=" * 100)
def colab_secret(name, default=""):
value = os.environ.get(name, default)
try:
from google.colab import userdata
secret_value = userdata.get(name)
if secret_value:
value = secret_value
except Exception:
pass
return value or default
GRAPHISTRY_SERVER = colab_secret("GRAPHISTRY_SERVER", "hub.graphistry.com")
GRAPHISTRY_PROTOCOL = colab_secret("GRAPHISTRY_PROTOCOL", "https")
GRAPHISTRY_USERNAME = colab_secret("GRAPHISTRY_USERNAME", "")
GRAPHISTRY_PASSWORD = colab_secret("GRAPHISTRY_PASSWORD", "")
GRAPHISTRY_PERSONAL_KEY_ID = colab_secret("GRAPHISTRY_PERSONAL_KEY_ID", "")
GRAPHISTRY_PERSONAL_KEY_SECRET = colab_secret("GRAPHISTRY_PERSONAL_KEY_SECRET", "")
REGISTERED = False
try:
if GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
personal_key_id=GRAPHISTRY_PERSONAL_KEY_ID,
personal_key_secret=GRAPHISTRY_PERSONAL_KEY_SECRET
)
REGISTERED = True
print("Graphistry registered with personal key credentials.")
elif GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
username=GRAPHISTRY_USERNAME,
password=GRAPHISTRY_PASSWORD
)
REGISTERED = True
print("Graphistry registered with username/password credentials.")
else:
graphistry.register(api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER)
print("No Graphistry credentials found. Local analytics will run; Graphistry .plot() uploads will be skipped.")
print("To enable live Graphistry plots, add Colab secrets:")
print("GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET")
print("or GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD")
except Exception as e:
REGISTERED = False
print("Graphistry registration was not completed:", repr(e))
print("Continuing with local analytics and local HTML visualization.")
def nid(kind, value):
return f"{kind}:{value}"
We set up the complete Colab environment by installing PyGraphistry and all supporting libraries for graph analytics, visualization, and machine learning. We configure the output directory, random seed, and Graphistry credentials so the notebook works both locally and with Graphistry Hub. We also define a reusable helper for node naming to keep every entity type clearly separated in the graph.
Generating Enterprise Access Dataset
n_users = 55
n_devices = 42
n_ips = 36
n_services = 15
n_roles = 7
n_geos = 10
n_events = 2200
users = [f"user_{i:03d}" for i in range(n_users)]
devices = [f"device_{i:03d}" for i in range(n_devices)]
ips = [f"10.{i // 255}.{i % 255}.{rng.integers(1, 255)}" for i in range(1, n_ips + 1)]
services = [
"salesforce", "snowflake", "github", "jira", "slack",
"vpn", "okta", "aws_console", "gcp_console", "databricks",
"hris", "email", "crm", "vault", "payments_api"
]
roles = ["employee", "analyst", "engineer", "manager", "admin", "contractor", "service_account"]
geos = ["IN", "US", "GB", "DE", "SG", "AE", "BR", "NL", "AU", "JP"]
privileged_users = set(rng.choice(users, size=7, replace=False))
compromised_users = set(rng.choice(list(set(users) - privileged_users), size=4, replace=False))
risky_devices = set(rng.choice(devices, size=5, replace=False))
risky_ips = set(rng.choice(ips, size=5, replace=False))
sensitive_services = {"aws_console", "gcp_console", "vault", "payments_api", "snowflake"}
user_role = {}
for u in users:
if u in privileged_users:
user_role[u] = rng.choice(["admin", "manager", "engineer"], p=[0.55, 0.2, 0.25])
elif rng.random() < 0.08:
user_role[u] = "contractor"
else:
user_role[u] = rng.choice(["employee", "analyst", "engineer"], p=[0.45, 0.25, 0.30])
user_home_geo = {u: rng.choice(geos, p=[0.30, 0.22, 0.10, 0.08, 0.08, 0.05, 0.04, 0.04, 0.04, 0.05]) for u in users}
device_owner = {d: rng.choice(users) for d in devices}
base_time = pd.Timestamp("2026-06-01 00:00:00")
events = []
for i in range(n_events):
if rng.random() < 0.18:
user = rng.choice(list(compromised_users))
else:
user = rng.choice(users)
if user in compromised_users and rng.random() < 0.42:
device = rng.choice(list(risky_devices))
else:
owned = [d for d, owner in device_owner.items() if owner == user]
device = rng.choice(owned if owned and rng.random() < 0.78 else devices)
if user in compromised_users and rng.random() < 0.50:
ip = rng.choice(list(risky_ips))
else:
ip = rng.choice(ips)
if user in compromised_users and rng.random() < 0.45:
service = rng.choice(list(sensitive_services))
else:
service = rng.choice(services)
role = user_role[user]
home_geo = user_home_geo[user]
geo = home_geo if rng.random() < 0.88 else rng.choice([g for g in geos if g != home_geo])
hour = int(rng.integers(0, 24))
minute = int(rng.integers(0, 60))
timestamp = base_time + pd.Timedelta(days=int(rng.integers(0, 10)), hours=hour, minutes=minute)
impossible_travel = int(geo != home_geo and rng.random() < 0.65)
off_hours = int(hour < 6 or hour > 21)
service_sensitivity = 1.0 if service in sensitive_services else 0.25
privileged = int(role in ["admin", "manager", "service_account"])
compromised = int(user in compromised_users)
risky_infra = int(device in risky_devices or ip in risky_ips)
risk_score = (
0.08
+ 0.22 * compromised
+ 0.18 * risky_infra
+ 0.17 * impossible_travel
+ 0.13 * off_hours
+ 0.15 * service_sensitivity
+ 0.07 * privileged
+ rng.normal(0, 0.06)
)
risk_score = float(np.clip(risk_score, 0.0, 1.0))
success_probability = 0.96 - 0.45 * risk_score
is_success = bool(rng.random() < success_probability)
amount = float(np.round(np.exp(rng.normal(7.0 + 1.4 * service_sensitivity, 0.8)), 2))
if service not in {"payments_api", "vault", "snowflake"}:
amount = float(np.round(amount * rng.uniform(0.01, 0.10), 2))
events.append({
"event_id": f"evt_{i:05d}",
"timestamp": timestamp,
"user": user,
"device": device,
"ip": ip,
"service": service,
"role": role,
"geo": geo,
"home_geo": home_geo,
"is_success": is_success,
"off_hours": bool(off_hours),
"impossible_travel": bool(impossible_travel),
"risk_score": risk_score,
"amount": amount,
"is_seeded_compromise": bool(user in compromised_users),
"is_privileged_user": bool(user in privileged_users),
"is_risky_device": bool(device in risky_devices),
"is_risky_ip": bool(ip in risky_ips)
})
events_df = pd.DataFrame(events)
events_df["timestamp"] = pd.to_datetime(events_df["timestamp"])
print("nRaw event sample:")
display(events_df.head(10))
print("nSeeded high-risk entities used for validation:")
print("Compromised users:", sorted(compromised_users))
print("Risky devices:", sorted(risky_devices))
print("Risky IPs:", sorted(risky_ips))
We generate a realistic synthetic enterprise access dataset with users, devices, IPs, services, roles, and geographic locations. We simulate normal and suspicious behavior by adding compromised users, risky devices, risky IPs, off-hours activity, impossible travel, and sensitive service access. We convert these events into a structured DataFrame that serves as the foundation for our graph-based security investigation.
Building Graph Edges Table
edge_rows = []
for row in events_df.itertuples(index=False):
user_node = nid("user", row.user)
device_node = nid("device", row.device)
ip_node = nid("ip", row.ip)
service_node = nid("service", row.service)
role_node = nid("role", row.role)
geo_node = nid("geo", row.geo)
home_geo_node = nid("geo", row.home_geo)
event_node = nid("event", row.event_id)
base = {
"event_id": row.event_id,
"timestamp": row.timestamp,
"risk_score": row.risk_score,
"amount": row.amount,
"is_success": row.is_success,
"off_hours": row.off_hours,
"impossible_travel": row.impossible_travel,
"is_seeded_compromise": row.is_seeded_compromise
}
edge_rows.extend([
{**base, "src": user_node, "dst": device_node, "src_type": "user", "dst_type": "device", "relation": "USES_DEVICE"},
{**base, "src": user_node, "dst": service_node, "src_type": "user", "dst_type": "service", "relation": "ACCESSES_SERVICE"},
{**base, "src": device_node, "dst": ip_node, "src_type": "device", "dst_type": "ip", "relation": "CONNECTS_FROM_IP"},
{**base, "src": ip_node, "dst": geo_node, "src_type": "ip", "dst_type": "geo", "relation": "RESOLVES_TO_GEO"},
{**base, "src": user_node, "dst": role_node, "src_type": "user", "dst_type": "role", "relation": "HAS_ROLE"},
{**base, "src": user_node, "dst": home_geo_node, "src_type": "user", "dst_type": "geo", "relation": "HOME_GEO"}
])
raw_edges_df = pd.DataFrame(edge_rows)
edges_df = (
raw_edges_df
.groupby(["src", "dst", "relation", "src_type", "dst_type"], as_index=False)
.agg(
event_count=("event_id", "nunique"),
first_seen=("timestamp", "min"),
last_seen=("timestamp", "max"),
max_risk=("risk_score", "max"),
avg_risk=("risk_score", "mean"),
failed_count=("is_success", lambda s: int((~s).sum())),
off_hours_count=("off_hours", "sum"),
impossible_travel_count=("impossible_travel", "sum"),
amount_sum=("amount", "sum"),
seeded_compromise_count=("is_seeded_compromise", "sum")
)
)
edges_df["edge_id"] = [f"edge_{i:05d}" for i in range(len(edges_df))]
edges_df["edge_label"] = edges_df["relation"] + " | n=" + edges_df["event_count"].astype(str)
edges_df["edge_size"] = np.clip(np.log1p(edges_df["event_count"]) * 2.5, 1, 20)
edges_df["edge_title"] = edges_df.apply(
lambda r: (
f"<b>{r['relation']}</b><br>"
f"{r['src']} → {r['dst']}<br>"
f"events: {int(r['event_count'])}<br>"
f"max risk: {r['max_risk']:.3f}<br>"
f"avg risk: {r['avg_risk']:.3f}<br>"
f"failures: {int(r['failed_count'])}<br>"
f"off-hours: {int(r['off_hours_count'])}<br>"
f"impossible-travel: {int(r['impossible_travel_count'])}<br>"
f"amount sum: {r['amount_sum']:.2f}"
),
axis=1
)
edges_df["first_seen"] = edges_df["first_seen"].astype(str)
edges_df["last_seen"] = edges_df["last_seen"].astype(str)
all_node_ids = sorted(set(edges_df["src"]).union(set(edges_df["dst"])))
nodes_df = pd.DataFrame({"id": all_node_ids})
nodes_df["entity_type"] = nodes_df["id"].str.split(":", n=1).str[0]
nodes_df["label"] = nodes_df["id"].str.split(":", n=1).str[1]
touch_src = raw_edges_df[["src", "event_id", "risk_score", "amount", "is_success", "off_hours", "impossible_travel"]].rename(columns={"src": "id"})
touch_dst = raw_edges_df[["dst", "event_id", "risk_score", "amount", "is_success", "off_hours", "impossible_travel"]].rename(columns={"dst": "id"})
touches = pd.concat([touch_src, touch_dst], ignore_index=True)
node_stats = (
touches
.groupby("id", as_index=False)
.agg(
touched_events=("event_id", "nunique"),
max_risk=("risk_score", "max"),
avg_risk=("risk_score", "mean"),
failed_touches=("is_success", lambda s: int((~s).sum())),
off_hours_touches=("off_hours", "sum"),
impossible_travel_touches=("impossible_travel", "sum"),
amount_touched=("amount", "sum")
)
)
nodes_df = nodes_df.merge(node_stats, on="id", how="left").fillna({
"touched_events": 0,
"max_risk": 0.0,
"avg_risk": 0.0,
"failed_touches": 0,
"off_hours_touches": 0,
"impossible_travel_touches": 0,
"amount_touched": 0.0
})
We transform raw event records into graph relationships by creating edges between users, devices, IPs, services, roles, and geographies. We aggregate repeated interactions into weighted edges with risk scores, counts, failures, timestamps, and activity summaries. We also create the node table and compute basic node-level statistics from all entity interactions.
Computing Graph Analytics Features
G = nx.DiGraph()
for row in nodes_df.itertuples(index=False):
G.add_node(row.id, entity_type=row.entity_type, label=row.label)
for row in edges_df.itertuples(index=False):
G.add_edge(
row.src,
row.dst,
relation=row.relation,
event_count=float(row.event_count),
max_risk=float(row.max_risk),
avg_risk=float(row.avg_risk),
failed_count=float(row.failed_count),
amount_sum=float(row.amount_sum)
)
degree_w = dict(G.degree(weight="event_count"))
in_degree_w = dict(G.in_degree(weight="event_count"))
out_degree_w = dict(G.out_degree(weight="event_count"))
try:
pagerank = nx.pagerank(G, weight="event_count", max_iter=250)
except Exception:
pagerank = {n: 0.0 for n in G.nodes()}
try:
betweenness = nx.betweenness_centrality(G, k=min(90, max(2, G.number_of_nodes())), seed=SEED)
except Exception:
betweenness = {n: 0.0 for n in G.nodes()}
UG = G.to_undirected()
try:
communities = list(nx.community.greedy_modularity_communities(UG, weight="event_count"))
except Exception:
communities = [set(c) for c in nx.connected_components(UG)]
community_map = {}
for cid, members in enumerate(communities):
for n in members:
community_map[n] = cid
nodes_df["degree_w"] = nodes_df["id"].map(degree_w).fillna(0.0)
nodes_df["in_degree_w"] = nodes_df["id"].map(in_degree_w).fillna(0.0)
nodes_df["out_degree_w"] = nodes_df["id"].map(out_degree_w).fillna(0.0)
nodes_df["pagerank"] = nodes_df["id"].map(pagerank).fillna(0.0)
nodes_df["betweenness"] = nodes_df["id"].map(betweenness).fillna(0.0)
nodes_df["community"] = nodes_df["id"].map(community_map).fillna(-1).astype(int)
risk_bins = [-0.001, 0.35, 0.65, 0.85, 1.001]
risk_labels = ["low", "medium", "high", "critical"]
nodes_df["risk_band"] = pd.cut(nodes_df["max_risk"], bins=risk_bins, labels=risk_labels).astype(str)
feature_cols = [
"touched_events",
"max_risk",
"avg_risk",
"failed_touches",
"off_hours_touches",
"impossible_travel_touches",
"amount_touched",
"degree_w",
"in_degree_w",
"out_degree_w",
"pagerank",
"betweenness"
]
X_num = nodes_df[feature_cols].replace([np.inf, -np.inf], 0).fillna(0.0)
X_scaled = StandardScaler().fit_transform(X_num)
iso = IsolationForest(
n_estimators=250,
contamination=0.10,
random_state=SEED
)
iso.fit(X_scaled)
nodes_df["anomaly_score"] = -iso.score_samples(X_scaled)
nodes_df["is_anomaly"] = iso.predict(X_scaled) == -1
type_color_map = {
"user": "#1f77b4",
"device": "#ff7f0e",
"ip": "#2ca02c",
"service": "#9467bd",
"role": "#8c564b",
"geo": "#17becf",
"event": "#7f7f7f"
}
nodes_df["node_color"] = nodes_df["entity_type"].map(type_color_map).fillna("#999999")
nodes_df.loc[nodes_df["risk_band"].eq("critical"), "node_color"] = "#d62728"
nodes_df.loc[nodes_df["is_anomaly"], "node_color"] = "#000000"
size_raw = (
8
+ 6 * np.log1p(nodes_df["degree_w"].astype(float))
+ 10 * nodes_df["pagerank"].astype(float) / max(nodes_df["pagerank"].max(), 1e-9)
+ 8 * nodes_df["is_anomaly"].astype(int)
)
nodes_df["node_size"] = np.clip(size_raw, 5, 60)
model_features = pd.concat([
nodes_df[feature_cols + ["anomaly_score"]].replace([np.inf, -np.inf], 0).fillna(0.0),
pd.get_dummies(nodes_df[["entity_type", "risk_band"]], dtype=float)
], axis=1)
try:
import umap
reducer = umap.UMAP(
n_components=2,
n_neighbors=min(18, max(2, len(nodes_df) - 1)),
min_dist=0.08,
metric="euclidean",
random_state=SEED
)
emb = reducer.fit_transform(StandardScaler().fit_transform(model_features))
layout_name = "UMAP"
except Exception:
reducer = PCA(n_components=2, random_state=SEED)
emb = reducer.fit_transform(StandardScaler().fit_transform(model_features))
layout_name = "PCA fallback"
nodes_df["x"] = emb[:, 0].astype(float)
nodes_df["y"] = emb[:, 1].astype(float)
nodes_df["point_title"] = nodes_df.apply(
lambda r: (
f"<b>{r['id']}</b><br>"
f"type: {r['entity_type']}<br>"
f"community: {int(r['community'])}<br>"
f"risk band: {r['risk_band']}<br>"
f"max risk: {r['max_risk']:.3f}<br>"
f"avg risk: {r['avg_risk']:.3f}<br>"
f"weighted degree: {r['degree_w']:.1f}<br>"
f"pagerank: {r['pagerank']:.6f}<br>"
f"betweenness: {r['betweenness']:.6f}<br>"
f"anomaly score: {r['anomaly_score']:.4f}<br>"
f"is anomaly: {bool(r['is_anomaly'])}"
),
axis=1
)
print("nGraph summary:")
print(f"Events: {len(events_df):,}")
print(f"Raw relationship rows: {len(raw_edges_df):,}")
print(f"Aggregated edges: {len(edges_df):,}")
print(f"Nodes: {len(nodes_df):,}")
print(f"Communities: {len(communities):,}")
print(f"External layout: {layout_name}")
print("nNode type counts:")
display(nodes_df["entity_type"].value_counts().rename_axis("entity_type").reset_index(name="count"))
print("nRisk band counts:")
display(nodes_df["risk_band"].value_counts().rename_axis("risk_band").reset_index(name="count"))
print("nTop 20 anomalous nodes:")
top_anomalies = (
nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
[["id", "entity_type", "risk_band", "is_anomaly", "anomaly_score", "max_risk", "avg_risk", "degree_w", "pagerank", "community"]]
.head(20)
)
display(top_anomalies)
print("nTop 20 risky relationships:")
top_edges = (
edges_df
.sort_values(["max_risk", "failed_count", "event_count"], ascending=[False, False, False])
[["src", "dst", "relation", "event_count", "max_risk", "avg_risk", "failed_count", "off_hours_count", "impossible_travel_count", "amount_sum"]]
.head(20)
)
display(top_edges)
fig = plt.figure(figsize=(9, 5))
plt.hist(nodes_df["anomaly_score"], bins=30)
plt.title("Node Anomaly Score Distribution")
plt.xlabel("Anomaly score")
plt.ylabel("Node count")
plt.show()
fig = plt.figure(figsize=(9, 5))
nodes_df.groupby("entity_type")["max_risk"].mean().sort_values().plot(kind="bar")
plt.title("Mean Max Risk by Entity Type")
plt.xlabel("Entity type")
plt.ylabel("Mean max risk")
plt.xticks(rotation=45, ha="right")
plt.show()
We build a NetworkX graph from the generated nodes and edges and compute advanced graph analytics. We calculate weighted degree, PageRank, betweenness centrality, communities, risk bands, anomaly scores, and machine-learning-based layout embeddings. We then inspect graph summaries, top anomalous nodes, risky relationships, and risk distributions through tables and plots.
Building Interactive Graph Visualizations
base_g = (
graphistry
.bind(source="src", destination="dst", node="id")
.edges(edges_df)
.nodes(nodes_df)
.bind(
edge="edge_id",
edge_title="edge_title",
edge_label="edge_label",
edge_weight="event_count",
edge_size="edge_size",
point_title="point_title",
point_label="label",
point_color="node_color",
point_size="node_size",
point_x="x",
point_y="y"
)
.settings(url_params={"play": 0, "info": "true"})
)
print("nConstructed a PyGraphistry Plotter named base_g.")
print("It binds src/dst edges, node attributes, titles, labels, sizes, colors, and external x/y layout.")
try:
dot_text = base_g.plot_static(engine="graphviz-dot", reuse_layout=True)
dot_path = OUT_DIR / "graph_static.dot"
with open(dot_path, "w") as f:
f.write(dot_text if isinstance(dot_text, str) else str(dot_text))
print("Saved DOT representation:", dot_path)
except Exception as e:
print("Static DOT export skipped:", repr(e))
def show_pyvis(nodes, edges, output_path, height="780px"):
nodes_small = nodes.copy()
edges_small = edges.copy()
max_nodes = 320
if len(nodes_small) > max_nodes:
keep = set(
nodes_small
.sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
.head(max_nodes)["id"]
)
nodes_small = nodes_small[nodes_small["id"].isin(keep)]
edges_small = edges_small[edges_small["src"].isin(keep) & edges_small["dst"].isin(keep)]
net = Network(
height=height,
width="100%",
directed=True,
notebook=True,
cdn_resources="in_line"
)
net.barnes_hut(gravity=-25000, central_gravity=0.2, spring_length=160, spring_strength=0.04, damping=0.92)
for row in nodes_small.itertuples(index=False):
title = str(row.point_title).replace("<br>", "n").replace("<b>", "").replace("</b>", "")
net.add_node(
row.id,
label=str(row.label),
title=title,
group=str(row.entity_type),
value=float(row.node_size)
)
for row in edges_small.itertuples(index=False):
title = str(row.edge_title).replace("<br>", "n").replace("<b>", "").replace("</b>", "")
net.add_edge(
row.src,
row.dst,
title=title,
label=str(row.relation) if row.max_risk >= 0.90 else "",
value=float(max(1.0, row.edge_size))
)
net.write_html(str(output_path), notebook=False)
display(HTML(filename=str(output_path)))
print("Saved local interactive HTML:", output_path)
local_full_html = OUT_DIR / "local_full_graph.html"
show_pyvis(nodes_df, edges_df, local_full_html)
seed_node = (
nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
.iloc[0]["id"]
)
ego = nx.ego_graph(G.to_undirected(), seed_node, radius=2)
ego_nodes = set(ego.nodes())
ego_edges_df = edges_df[edges_df["src"].isin(ego_nodes) & edges_df["dst"].isin(ego_nodes)].copy()
ego_nodes_df = nodes_df[nodes_df["id"].isin(ego_nodes)].copy()
print("nFocused investigation seed node:", seed_node)
print(f"Ego subgraph nodes: {len(ego_nodes_df):,}")
print(f"Ego subgraph edges: {len(ego_edges_df):,}")
display(
ego_nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max_risk"], ascending=[False, False, False])
[["id", "entity_type", "risk_band", "is_anomaly", "anomaly_score", "max_risk", "degree_w", "pagerank", "community"]]
.head(30)
)
ego_g = (
graphistry
.bind(source="src", destination="dst", node="id")
.edges(ego_edges_df)
.nodes(ego_nodes_df)
.bind(
edge="edge_id",
edge_title="edge_title",
edge_label="edge_label",
edge_weight="event_count",
edge_size="edge_size",
point_title="point_title",
point_label="label",
point_color="node_color",
point_size="node_size",
point_x="x",
point_y="y"
)
.settings(url_params={"play": 0, "info": "true"})
)
local_ego_html = OUT_DIR / "local_ego_investigation_graph.html"
show_pyvis(ego_nodes_df, ego_edges_df, local_ego_html)
risky_edges_df = edges_df[
(edges_df["max_risk"] >= 0.85)
| (edges_df["failed_count"] >= edges_df["failed_count"].quantile(0.95))
| (edges_df["impossible_travel_count"] > 0)
].copy()
risky_node_ids = set(risky_edges_df["src"]).union(set(risky_edges_df["dst"]))
risky_nodes_df = nodes_df[nodes_df["id"].isin(risky_node_ids)].copy()
risky_g = (
graphistry
.bind(source="src", destination="dst", node="id")
.edges(risky_edges_df)
.nodes(risky_nodes_df)
.bind(
edge="edge_id",
edge_title="edge_title",
edge_label="edge_label",
edge_weight="event_count",
edge_size="edge_size",
point_title="point_title",
point_label="label",
point_color="node_color",
point_size="node_size",
point_x="x",
point_y="y"
)
.settings(url_params={"play": 0, "info": "true"})
)
print("nHigh-risk filtered graph:")
print(f"Risky nodes: {len(risky_nodes_df):,}")
print(f"Risky edges: {len(risky_edges_df):,}")
local_risky_html = OUT_DIR / "local_high_risk_graph.html"
show_pyvis(risky_nodes_df, risky_edges_df, local_risky_html)
We create PyGraphistry plot objects by binding source and destination nodes, node IDs, labels, colors, sizes, tooltips, and layout coordinates. We also generate local PyVis HTML visualizations so we can inspect the full graph, a focused ego investigation graph, and a high-risk filtered graph without requiring Graphistry credentials. We use these views to transition from broad graph exploration to targeted investigation of suspicious entities.
Exporting Hypergraphs and Artifacts
try:
hypergraph_input = events_df[[
"event_id", "user", "device", "ip", "service", "role", "geo",
"risk_score", "amount", "is_success", "off_hours", "impossible_travel"
]].head(450).copy()
hg = graphistry.hypergraph(
hypergraph_input,
["user", "device", "ip", "service", "role", "geo"]
)
hyper_g = hg["graph"]
print("nConstructed a PyGraphistry hypergraph from raw event rows.")
print("Hypergraph keys:", list(hg.keys()))
except Exception as e:
hyper_g = None
print("nHypergraph transform skipped:", repr(e))
if REGISTERED:
print("nUploading interactive visualizations to Graphistry...")
try:
full_url = base_g.plot(render=False)
print("Full graph URL:", full_url)
display(IFrame(full_url, width="100%", height=780))
except Exception as e:
print("Full Graphistry upload failed:", repr(e))
try:
ego_url = ego_g.plot(render=False)
print("Ego investigation graph URL:", ego_url)
display(IFrame(ego_url, width="100%", height=780))
except Exception as e:
print("Ego Graphistry upload failed:", repr(e))
try:
risky_url = risky_g.plot(render=False)
print("High-risk graph URL:", risky_url)
display(IFrame(risky_url, width="100%", height=780))
except Exception as e:
print("Risky Graphistry upload failed:", repr(e))
if hyper_g is not None:
try:
hyper_url = hyper_g.plot(render=False)
print("Hypergraph URL:", hyper_url)
display(IFrame(hyper_url, width="100%", height=780))
except Exception as e:
print("Hypergraph Graphistry upload failed:", repr(e))
else:
print("nGraphistry upload skipped because credentials are not configured.")
print("Local HTML visualizations were still generated and displayed.")
events_path = OUT_DIR / "events.csv"
raw_edges_path = OUT_DIR / "raw_edges.parquet"
edges_path = OUT_DIR / "aggregated_edges.parquet"
nodes_path = OUT_DIR / "nodes.parquet"
summary_path = OUT_DIR / "investigation_summary.json"
gexf_path = OUT_DIR / "enterprise_access_graph.gexf"
events_df.to_csv(events_path, index=False)
raw_edges_df.to_parquet(raw_edges_path, index=False)
edges_df.to_parquet(edges_path, index=False)
nodes_df.to_parquet(nodes_path, index=False)
nx.write_gexf(G, gexf_path)
summary = {
"events": int(len(events_df)),
"raw_edges": int(len(raw_edges_df)),
"aggregated_edges": int(len(edges_df)),
"nodes": int(len(nodes_df)),
"communities": int(len(communities)),
"layout": layout_name,
"seed_node_for_ego_investigation": seed_node,
"compromised_users": sorted(compromised_users),
"risky_devices": sorted(risky_devices),
"risky_ips": sorted(risky_ips),
"top_anomalies": top_anomalies.to_dict(orient="records"),
"top_risky_edges": top_edges.to_dict(orient="records"),
"outputs": {
"events_csv": str(events_path),
"raw_edges_parquet": str(raw_edges_path),
"aggregated_edges_parquet": str(edges_path),
"nodes_parquet": str(nodes_path),
"gexf": str(gexf_path),
"local_full_graph_html": str(local_full_html),
"local_ego_graph_html": str(local_ego_html),
"local_high_risk_graph_html": str(local_risky_html)
}
}
with open(summary_path, "w") as f:
json.dump(summary, f, indent=2)
print("nSaved tutorial artifacts:")
for k, v in summary["outputs"].items():
print(f"{k}: {v}")
print("summary_json:", summary_path)
print("nNotebook next steps:")
print("1. Open the local HTML graphs above to inspect communities, anomalies, risky IPs, and suspicious user-service paths.")
print("2. Add Graphistry credentials as Colab secrets to enable GPU-backed Graphistry Hub uploads.")
print("3. Replace the synthetic events_df with your own access logs, transactions, security alerts, or entity relationship table.")
print("4. Keep the same edges_df/nodes_df schema to reuse the analytics and visualization pipeline.")
We create a PyGraphistry hypergraph from raw event rows, illustrating another way to convert tabular data into graph form. We optionally upload the full graph, ego graph, risky graph, and hypergraph to Graphistry Hub when credentials are available. We finally export all important artifacts, including CSV, Parquet, GEXF, HTML, and JSON files, so that we can reuse the results for further analysis.
Conclusion
In conclusion, we completed an end-to-end PyGraphistry pipeline that transforms raw event-style data into a fully enriched, analyzable graph. We constructed meaningful relationships, computed graph features, identified anomalous entities, created focused investigation subgraphs, and exported reusable artifacts for further analysis. We also made the workflow flexible by supporting both local HTML visualization and optional Graphistry Hub uploads so that we can run it easily in Google Colab with or without credentials. At last, we have a strong foundation for applying PyGraphistry to real-world use cases such as fraud detection, cybersecurity investigation, access monitoring, entity resolution, and graph-based risk intelligence.
Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation appeared first on MarkTechPost.
MarkTechPost
