Clear History

This commit is contained in:
2023-05-11 17:44:13 +02:00
commit 4acb9b9ae8
77 changed files with 5363 additions and 0 deletions

99
code/src/kinds/link.py Normal file
View File

@@ -0,0 +1,99 @@
import kopf
import logging
import templates
import k8s
import utils
RESOURCE = 'links.' + utils.GROUP
def get_link_metadata(body):
try:
name = body['metadata']['name']
sender = body['spec']['from']
receiver = body['spec']['to']
direction = body['spec']['direction']
if direction not in ['uni', 'bi']:
raise kopf.PermanentError(f'Invalid direction "{direction}" for link "{name}"')
unidirectional = direction == 'uni'
namespace = body['metadata']['namespace']
return namespace, name, sender, receiver, unidirectional
except KeyError as e:
raise kopf.PermanentError(f'Invalid link "{name}"') from e
def build_labels(node, value):
return {"metadata": {"labels": {
f"send-node-{node}": value,
f"receive-node-{node}": value,
}}}
@kopf.on.create(RESOURCE)
def create(body, **kwargs) -> None:
namespace, name, sender, receiver, unidirectional = get_link_metadata(body)
logging.info(f'Link "{name}" ({sender} -> {receiver}) is created on namespace "{namespace}"')
logging.debug(body)
# Chaos experiment
logging.info(
f'Creating link "{name}" ({sender} -> {receiver}) on namespace "{namespace}" with parameters "{body.spec}"')
direction = 'to' if unidirectional else 'both'
crd = templates.chaos_link(name=name, namespace=namespace, sender=sender, receiver=receiver, direction=direction)
# Check if there is a fault to apply
has_fault = False
if 'bandwidth' in body.spec:
has_fault = True
crd['spec']['action'] = 'bandwidth'
crd['spec']['bandwidth'] = body.spec['bandwidth']
else:
crd['spec']['action'] = 'netem'
for action in ['delay', 'loss', 'duplicate', 'corrupt']:
if action in body.spec:
has_fault = True
crd['spec'][action] = body.spec[action]
if has_fault:
# Only create the chaos experiment if there is a fault to apply
group, version = crd['apiVersion'].split('/')
kopf.adopt(crd)
k8s.crd.create_namespaced_custom_object(group, version, namespace, 'networkchaos', crd)
# Label the pods to enable the service and exception to the network policy
pod_sender = k8s.core.list_namespaced_pod(namespace=namespace, label_selector=f'node={sender}')
for pod in pod_sender.items:
patch = build_labels(receiver, "enabled")
if unidirectional:
del patch["metadata"]["labels"][f"receive-node-{receiver}"]
k8s.core.patch_namespaced_pod(pod.metadata.name, namespace, patch)
pod_receiver = k8s.core.list_namespaced_pod(namespace=namespace, label_selector=f'node={receiver}')
for pod in pod_receiver.items:
patch = build_labels(sender, "enabled")
if unidirectional:
del patch["metadata"]["labels"][f"send-node-{sender}"]
k8s.core.patch_namespaced_pod(pod.metadata.name, namespace, patch)
@ kopf.on.delete(RESOURCE)
def delete(body, **kwargs):
namespace, name, sender, receiver, unidirectional = get_link_metadata(body)
logging.info(f'Link "{name}" ({sender} -> {receiver}) is deleted on namespace "{namespace}"')
logging.debug(body)
# Reset labels
pod_sender = k8s.core.list_namespaced_pod(namespace=namespace, label_selector=f'node={sender}')
for pod in pod_sender.items:
patch = build_labels(receiver, None)
if unidirectional:
del patch["metadata"]["labels"][f"receive-node-{receiver}"]
k8s.core.patch_namespaced_pod(pod.metadata.name, namespace, patch)
pod_receiver = k8s.core.list_namespaced_pod(namespace=namespace, label_selector=f'node={receiver}')
for pod in pod_receiver.items:
patch = build_labels(sender, None)
if unidirectional:
del patch["metadata"]["labels"][f"send-node-{sender}"]
k8s.core.patch_namespaced_pod(pod.metadata.name, namespace, patch)

55
code/src/kinds/node.py Normal file
View File

@@ -0,0 +1,55 @@
import kopf
import logging
import json
import templates
import k8s
import utils
RESOURCE = 'nodes.' + utils.GROUP
@kopf.on.create(RESOURCE)
def create(body, **kwargs):
name = body['metadata']['name']
namespace = body['metadata']['namespace']
logging.info(f'Node "{name}" is created on namespace "{namespace}"')
logging.debug(body)
try:
resources = json.dumps(body['spec']['resources'])
except KeyError:
resources = '{}'
try:
airGapped = body['spec']['airGapped'] == True
except KeyError:
airGapped = True
deployment, service, policy = templates.native_node(id=name, image=body['spec']['image'], resources=resources)
if not airGapped:
obj = {"ipBlock": {
"cidr": "0.0.0.0/0",
"except": ["10.0.0.0/8"]
}}
policy['spec']['ingress'].append({'from': [obj]})
policy['spec']['egress'].append({'to': [obj]})
logging.debug(deployment)
logging.debug(service)
logging.debug(policy)
kopf.adopt(deployment)
kopf.adopt(service)
kopf.adopt(policy)
k8s.apps.create_namespaced_deployment(namespace, deployment)
k8s.core.create_namespaced_service(namespace, service)
k8s.networking.create_namespaced_network_policy(namespace, policy)
@kopf.on.delete(RESOURCE)
def delete(body, **kwargs):
name = body['metadata']['name']
logging.info(f'Node "{name}" is deleted')
logging.debug(body)

141
code/src/kinds/scenario.py Normal file
View File

@@ -0,0 +1,141 @@
import asyncio
import logging
import kopf
import k8s
import templates
import utils
RESOURCE = 'scenarios.' + utils.GROUP
@kopf.on.create(RESOURCE)
def create(body, patch, **kwargs):
name = body['metadata']['name']
namespace = body['metadata']['namespace']
logging.info(f'scenario "{name}" is created on namespace "{namespace}"')
@kopf.on.delete(RESOURCE)
def delete(body, **kwargs):
name = body['metadata']['name']
logging.info(f'scenario "{name}" is deleted')
@kopf.daemon(RESOURCE, cancellation_timeout=1)
async def daemon(meta, status, spec, **kwargs):
try:
while True:
try:
if status['ended']:
return
except KeyError:
pass
name = meta['name']
namespace = meta['namespace']
def patch(body):
k8s.crd.patch_namespaced_custom_object(
utils.GROUP, utils.VERSION, namespace, 'scenarios', name, {'status': body})
def with_prefix(id: str) -> str:
return f'{name}-{id}'
now = utils.timestamp_ms()
try:
started = status['started']
except KeyError:
patch({'started': now})
logging.info('waiting for scenario to start...')
await asyncio.sleep(0.1)
continue
# Time since the scenario started
elapsed = now - started
logging.info(f'elapsed: {elapsed}')
logging.debug(status)
# Calculate when the next run should be executed, based on future events
next_run = None
for i, event in enumerate(spec['events']):
offset = (event['offset'] or 0) * 1000
delta = offset - elapsed
if delta > 0:
if next_run is None or delta < next_run:
next_run = delta
continue
try:
s = status['events'][str(i)]
executed = s['executed']
except KeyError:
executed = False
if not executed:
# Execute event
logging.info(f'executing event {i}')
patch({'events': {i: {'executed': True, 'timestamp': now}}})
action = event['action']
if action == 'end':
# End scenario
logging.info(f'ending scenario {event}')
patch({'ended': now})
return
# NODE
elif event['resource'] == 'node':
ID = with_prefix(event['id'])
if action == 'create':
# Create node
logging.info(f'creating node {event}')
body = templates.iluzio_node(id=ID)
body['spec'] = event['spec']
kopf.adopt(body)
k8s.crd.create_namespaced_custom_object(
utils.GROUP, utils.VERSION, namespace, 'nodes', body)
elif action == 'delete':
# Delete node
logging.info(f'deleting node {event}')
k8s.crd.delete_namespaced_custom_object(
utils.GROUP, utils.VERSION, namespace, 'nodes', ID)
else:
logging.error(f'unknown action {action}')
# LINK
elif event['resource'] == 'link':
ID = '-'.join(map(lambda x: event[x], ['from', 'to', 'direction']))
ID = with_prefix(ID)
if action == 'create':
# Create links
logging.info(f'creating link {event}')
body = templates.iluzio_link(id=ID)
body['spec'].update(event['spec'])
body['spec']['from'] = with_prefix(event['from'])
body['spec']['to'] = with_prefix(event['to'])
body['spec']['direction'] = event['direction']
kopf.adopt(body)
k8s.crd.create_namespaced_custom_object(
utils.GROUP, utils.VERSION, namespace, 'links', body)
elif action == 'delete':
# Delete link
logging.info(f'deleting link {event}')
k8s.crd.delete_namespaced_custom_object(
utils.GROUP, utils.VERSION, namespace, 'links', ID)
else:
logging.error(f'unknown action {action}')
else:
logging.error(f'unknown resource {event["resource"]}')
if next_run is None:
logging.error('no next run could be calculated')
return
logging.info(f'waiting for next run in {next_run} ms')
await asyncio.sleep(next_run / 1000)
except asyncio.CancelledError:
logging.info('cancellation requested')