-
Notifications
You must be signed in to change notification settings - Fork 3
/
onboarding_with_webhooks.py
81 lines (63 loc) · 2.78 KB
/
onboarding_with_webhooks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
import secrets
from typing import Optional
from meiga.assertions import assert_success
from alice import Config, Webhook, Webhooks
RESOURCES_PATH = f"{os.path.dirname(os.path.abspath(__file__))}/../resources"
def configure_webhooks(api_key: str, verbose: Optional[bool] = False) -> None:
config = Config(api_key=api_key, verbose=verbose)
webhooks_client = Webhooks.from_config(config)
# Check Available events
subscriptable_events = webhooks_client.get_subscriptable_events().unwrap_or_raise()
selected_event = subscriptable_events[0]
# Create a new Webhook
webhook = Webhook(
active=True,
post_url="http://google.com",
api_key="b0b905d6-228f-44bf-a130-c85d7aecd765",
event_name=selected_event.get("name"),
event_version=selected_event.get("version"),
secret=str(secrets.token_hex(20)),
)
webhook_id = webhooks_client.create_webhook(webhook).unwrap_or_raise()
# Update an existent Webhook
webhook_to_update = Webhook(
webhook_id=webhook_id, # Needed if we want to update
active=False,
post_url="http://alicebiometrics.com",
api_key="b0b905d6-228f-44bf-a130-c85d7aecd765",
event_name="user_created",
event_version="1",
algorithm="sha256",
secret=str(secrets.token_hex(20)),
)
webhooks_client.update_webhook(webhook_to_update).unwrap_or_raise()
# Update the activation of a Webhook
webhooks_client.update_webhook_activation(webhook_id, True)
retrieved_webhook = webhooks_client.get_webhook(webhook_id).unwrap_or_raise()
assert retrieved_webhook.active
# Send a ping using configured webhook
webhooks_client.ping_webhook(webhook_id).unwrap_or_raise()
# Retrieve an existent Webhook
retrieved_webhook = webhooks_client.get_webhook(webhook_id).unwrap_or_raise()
assert retrieved_webhook.active
assert retrieved_webhook.post_url == "http://alicebiometrics.com"
# Retrieve all configured webhooks
retrieved_webhooks = webhooks_client.get_webhooks().unwrap_or_raise()
# Delete a configured webhook
result = webhooks_client.delete_webhook(webhook_id)
assert_success(result)
# Retrieve all webhook results of a specific webhook
webhook_results = webhooks_client.get_webhook_results(webhook_id).unwrap_or_raise()
# Retrieve las webhook result of a specific webhook
last_webhook_result = webhooks_client.get_last_webhook_result(
webhook_id
).unwrap_or_raise()
if __name__ == "__main__":
api_key = os.environ.get("ONBOARDING_API_KEY")
if api_key is None:
raise AssertionError(
"Please configure your ONBOARDING_API_KEY to run the example"
)
print("Running webhook configuration...")
configure_webhooks(api_key=api_key, verbose=True)