Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink improvements rocksdb state pr #406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions FlinkSqlGateway/submitjob/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
print(error)

# Create SETs
if 'sqlsets' in d:
sets = d['sqlsets']
if 'sqlsettings' in d:
sets = d['sqlsettings']
for set in sets:
v = set.replace('=', ' ').split(' ')
key = v[1]
value = v[-1].strip(';').strip('\'')
print(f'SET: {key}={value}')
keys = list(set.keys())
key = keys[0]
value = set[key]
print(f'SET: {key}, {value}')
table_env.get_config().set(key, value)

# Create Tables
Expand Down
34 changes: 20 additions & 14 deletions FlinkSqlServicesOperator/beamsqlstatementsetoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
DEFAULT_TIMEOUT = 60

timer_interval_seconds = int(os.getenv("TIMER_INTERVAL", default="10"))
timer_backoff_seconds = int(os.getenv("TIMER_BACKOFF_INTERVAL", default="10"))
timer_backoff_seconds = int(os.getenv("TIMER_BACKOFF_INTERVAL", default="30"))
timer_backoff_temp_failure_seconds = int(
os.getenv("TIMER_BACKOFF_TEMPORARY_FAILURE_INTERVAL", default="30"))

monitor_retries = int(os.getenv("MONITOR_RETRIES", default="10000000"))

class States(Enum):
"""SQL Job states as defined by Flink"""
Expand Down Expand Up @@ -258,7 +258,7 @@ def update(body, spec, patch, logger, retries=20, **kwargs):


@kopf.timer("industry-fusion.com", "v1alpha3", "beamsqlstatementsets",
interval=timer_interval_seconds, backoff=timer_backoff_seconds)
interval=timer_interval_seconds, backoff=timer_backoff_seconds, retries=monitor_retries)
# pylint: disable=too-many-arguments unused-argument redefined-outer-name
# pylint: disable=too-many-locals too-many-statements too-many-branches
# Kopf decorated functions match their expectations
Expand Down Expand Up @@ -322,7 +322,7 @@ def monitor(beamsqltables: kopf.Index, beamsqlviews: kopf.Index,
# (3) Views
statementset = {}
sets = create_sets(spec, body, namespace, name, logger)
statementset['sqlsets'] = sets
statementset['sqlsettings'] = sets

# get first all table ddls
# get inputTable and outputTable
Expand Down Expand Up @@ -404,14 +404,16 @@ def monitor(beamsqltables: kopf.Index, beamsqlviews: kopf.Index,
create(body, spec, patch, logger, **kwargs)
# If state is not INITIALIZED, DEPLOYMENT_FAILURE nor CANCELED,
# the state is monitored
if state not in [States.CANCELED.name,
States.CANCELING.name, States.SAVEPOINTING.name, States.UPDATING.name]:
if state not in [States.CANCELING.name, States.SAVEPOINTING.name, States.UPDATING.name]:
refresh_state(body, patch, logger)
if patch.status[STATE] == States.NOT_FOUND.name:
logger.info("Job seems to be lost. Will re-initialize")
patch.status[STATE] = States.INITIALIZED.name
patch.status[JOB_ID] = None

if patch.status[STATE] == States.CANCELED.name:
logger.info("Job is cancelled. Will re-initialize")
patch.status[STATE] = States.INITIALIZED.name
patch.status[JOB_ID] = None

# pylint: disable=too-many-arguments unused-argument redefined-outer-name
# kopf is ingesting too many parameters, this is inherite by subroutine
Expand Down Expand Up @@ -447,17 +449,18 @@ def create_sets(spec, body, namespace, name, logger):
"""
sets = []
sqlsettings = spec.get('sqlsettings')
pipeline_name = {'pipeline.name': f'{namespace}/{name}'}
if not sqlsettings:
message = "pipeline name not determined in"\
f" {namespace}/{name}, using default"
logger.debug(message)
sets.append(f"SET pipeline.name = '{namespace}/{name}';")
sets.append(pipeline_name)
elif all(x for x in sqlsettings if x.get('pipeline.name') is None):
sets.append(f"SET pipeline.name = '{namespace}/{name}';")
sets.append(pipeline_name)
for setting in sqlsettings:
key = list(setting.keys())[0]
value = setting.get(key)
sets.append(f"SET '{key}' = '{value}';")
#key = list(setting.keys())[0]
#value = setting.get(key)
sets.append(setting)
# add savepoint if location is set
try:
savepoint_location = body['status'].get(SAVEPOINT_LOCATION)
Expand Down Expand Up @@ -486,14 +489,17 @@ def refresh_state(body, patch, logger):
f"Could not monitor task {job_id}: {exc}",
timer_backoff_temp_failure_seconds) from exc
if job_info is not None:
patch.status[STATE] = job_info.get("state")
state = job_info.get("state")
patch.status[STATE] = state
else:
# API etc works but no job found. Can happen for instance
# after restart of job manager
# In this case, we need to signal that stage
# In this case, we need to redeploy the service
#
patch.status[STATE] = States.UNKNOWN.name



def deploy_statementset(statementset, logger):
"""
deploy statementset to flink SQL gateway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def submit_statementset_successful(statementset, logger):
# be an object method after mocking
#assert statementset == "SET pipeline.name = 'namespace/name';\nDDL;" \
# "\nBEGIN STATEMENT SET;\nselect;\nEND;"
assert statementset == {'sqlsets': ["SET pipeline.name = 'namespace/name';"],
assert statementset == {'sqlsettings': [{'pipeline.name': 'namespace/name'}],
'tables': ['DDL;'],
'sqlstatementset': ['select;']}
return "job_id"
Expand Down
26 changes: 18 additions & 8 deletions helm/charts/flink/templates/flink-configuration-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ metadata:
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 5
taskmanager.numberOfTaskSlots: 4
pipeline.max-parallelism: 4
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
rest.flamegraph.enabled: true

queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: {{ .Values.flink.defaultParalellism }}
jobmanager.memory.process.size: 1000m
taskmanager.memory.process.size: 2500m

s3.endpoint: {{ printf "%s://%s" .Values.s3.protocol .Values.s3.endpoint }}
{{- if .Values.minio.enabled }}
s3.path.style.access: true
Expand All @@ -30,16 +33,23 @@ data:
{{ else }}
s3.secret-key: {{ .Values.s3.userSecretKey }}
{{ end }}
state.backend: rocksdb
state.backend.rocksdb.localdir: /tmp/rocksdb
state.backend.incremental: false
state.backend.rocksdb.writebuffer.size: 64 kb
state.backend.rocksdb.compaction.level.target-file-size-base: 64 kb
state.backend.rocksdb.use-bloom-filter: true
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED
state.checkpoints.dir: s3://{{ .Values.flink.bucket }}/{{ .Values.flink.checkpointDir }}
state.savepoints.dir: s3://{{ .Values.flink.bucket }}/{{ .Values.flink.savepointDir }}
kubernetes.cluster-id: {{ .Values.flink.clusterId }}
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability: kubernetes
high-availability.storageDir: s3://{{ .Values.flink.bucket }}/{{ .Values.flink.haDir }}
kubernetes.namespace: {{ .Release.Namespace }}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy: exponential-delay
restart-strategy.exponential-delay.max-backoff: 2 min
execution.checkpointing.interval: {{ .Values.flink.checkpointInterval }}
process.working-dir: '/tmp'
process.working-dir: /tmp/process
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ spec:
mountPath: /opt/flink/conf/
- mountPath: /tmp
name: tm-tmp-volume
resources:
requests:
memory: "2.5G"
cpu: 1
limits:
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
Expand Down
4 changes: 4 additions & 0 deletions helm/charts/sql-core/templates/core-statementsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ metadata:
{{ include (print $.Template.BasePath "/core-tables.yaml") . | sha256sum }}
{{ include (print $.Template.BasePath "/core-kafka-topics.yaml") . | sha256sum }}
spec:
sqlsettings:
- table.exec.state.ttl: '{{.Values.kafkaBridge.debezium.attributesTopicRetention}} ms'
- execution.savepoint.ignore-unclaimed-state: 'true'
- pipeline.object-reuse: 'true'
sqlstatements:
- |-
/* validates changes in alerts_bulk and only forward if there are specific changes
Expand Down
24 changes: 21 additions & 3 deletions semantic-model/shacl2flink/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ sqlite_files = $(OUTPUTDIR)/core.sqlite $(OUTPUTDIR)/ngsild.sqlite $(OUTPUTDIR)/

build: $(SHACL) $(KNOWLEDGE $(MODEL)
@echo Build tables
$(PYTHON) create_rdf_table.py $(KNOWLEDGE)
$(PYTHON) create_rdf_table.py $(KNOWLEDGE) $(KAFKA_TOPICS)
$(PYTHON) create_core_tables.py
${PYTHON} create_udfs.py
$(PYTHON) create_ngsild_tables.py $(SHACL)
$(PYTHON) create_ngsild_tables.py $(SHACL) $(KAFKA_TOPICS)
$(PYTHON) create_ngsild_models.py $(SHACL) $(KNOWLEDGE) $(MODEL)
$(PYTHON) create_sql_checks_from_shacl.py $(SHACL) $(KNOWLEDGE)

Expand Down Expand Up @@ -78,18 +78,34 @@ test-sqlite-update:
sleep 1
cat $(OUTPUTDIR)/shacl-validation.sqlite | $(SQLITE3) $(SQLITEDB)


test-kms:
@echo Test different kms setups
cd tests/sql-tests && bash tests.sh


disable-strimzi:
kubectl -n $(NAMESPACE) scale deployment -l "operators.coreos.com/strimzi-kafka-operator.iff"="" --replicas=0


enable-strimzi:
kubectl -n $(NAMESPACE) scale deployment -l "operators.coreos.com/strimzi-kafka-operator.iff"="" --replicas=1


flink-deploy: clean
@echo deploy helm package
make helm
cd ../../helm && ./helmfile -f helmfile-shacl.yaml apply --args "--force"
make disable-strimzi || echo "Can fail"
make build || make enable-strimzi
kubectl -n $(NAMESPACE) delete -f output/ngsild-kafka.yaml --ignore-not-found || make enable-strimzi
kubectl -n $(NAMESPACE) delete -f output/rdf-kafka.yaml --ignore-not-found || make enable-strimzi
cd ../../helm && ./helmfile -f helmfile-shacl.yaml apply --args "--force" || make enable-strimzi
make enable-strimzi
make test-flink-is-deployed
sleep 2
kubectl -n $(NAMESPACE) -l "app.kubernetes.io/instance"="kafka-connect" delete pod


flink-undeploy:
@echo undeploy helm package
cd ../../helm && ./helmfile -f helmfile-shacl.yaml destroy
Expand All @@ -102,12 +118,14 @@ test-flink-is-deployed:
cd tests/bats && bats test-shacl-flink-deployment
rm tests/bats/lib


test-full-flink-deployment:
@echo test deployment of helm package
make helm
make flink-deploy
make flink-undeploy


test-flink-is-undeployed:
@echo test undeployment of helm package
ln -fs ../../../../test/bats/lib tests/bats/lib || echo 'Bats framework not installed'
Expand Down
7 changes: 4 additions & 3 deletions semantic-model/shacl2flink/create_ngsild_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def main(shaclfile, output_folder='output'):
config = {}
config['retention.ms'] = configs.kafka_topic_ngsi_retention
with open(os.path.join(output_folder, "ngsild.yaml"), "w") as f,\
open(os.path.join(output_folder, "ngsild.sqlite"), "w") as sqlitef:
open(os.path.join(output_folder, "ngsild.sqlite"), "w") as sqlitef,\
open(os.path.join(output_folder, "ngsild-kafka.yaml"), "w") as fk:
for table_name, table in tables.items():
connector = 'kafka'
primary_key = None
Expand All @@ -88,12 +89,12 @@ def main(shaclfile, output_folder='output'):
print('---', file=f)
yaml.dump(utils.create_yaml_view(table_name, table), f)
print(utils.create_sql_view(table_name, table), file=sqlitef)
print('---', file=f)
print('---', file=fk)
yaml.dump(utils.create_kafka_topic(f'{configs.kafka_topic_ngsi_prefix}.\
{utils.class_to_obj_name(table_name)}',
f'{configs.kafka_topic_ngsi_prefix}.\
{table_name}', configs.kafka_topic_object_label,
config), f)
config), fk)


if __name__ == '__main__':
Expand Down
10 changes: 5 additions & 5 deletions semantic-model/shacl2flink/create_rdf_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def main(knowledgefile, output_folder='output'):
fp.write('\n')
fp.write(sqlstatements)

with open(os.path.join(output_folder, "rdf.yaml"), "w") as fp:
with open(os.path.join(output_folder, "rdf.yaml"), "w") as fp,\
open(os.path.join(output_folder, "rdf-kafka.yaml"), "w") as fk:
fp.write('---\n')
yaml.dump(utils.create_yaml_table(table_name, connector, table,
primary_key, kafka, value), fp)
Expand All @@ -130,12 +131,11 @@ def main(knowledgefile, output_folder='output'):
fp.write("---\n")
yaml.dump(utils.create_statementset('rdf-statements' + str(num), [table_name],
[], None, [statementset]), fp)
fp.write("---\n")
yaml.dump(utils.create_kafka_topic(utils.class_to_obj_name(
configs.rdf_topic),
fk.write("---\n")
yaml.dump(utils.create_kafka_topic(utils.class_to_obj_name(configs.rdf_topic),
configs.rdf_topic,
configs.kafka_topic_object_label,
config), fp)
config), fk)


if __name__ == '__main__':
Expand Down
3 changes: 0 additions & 3 deletions semantic-model/shacl2flink/create_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ def main(output_folder='output'):
crd['spec'] = spec
f.write('---')
yaml.dump(crd, f)
with open(os.path.join(output_folder, 'test'), 'w') as f:
f.write('---')
f.close()


if __name__ == '__main__':
Expand Down
7 changes: 6 additions & 1 deletion semantic-model/shacl2flink/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,12 @@ def create_statementset(object_name, table_object_names,
spec['tables'] = table_object_names
spec['views'] = view_object_names
if ttl is not None:
spec['sqlsettings'] = [{"table.exec.state.ttl": f"{ttl}", "execution.savepoint.ignore-unclaimed-state": "true"}]
spec['sqlsettings'] = [
{"table.exec.state.ttl": f"{ttl}"},
{"execution.savepoint.ignore-unclaimed-state": "true"},
{"pipeline.object-reuse": "true"},
{"parallelism.default": "{{ .Values.flink.defaultParalellism }}"}
]
spec['sqlstatements'] = statementsets
spec['updateStrategy'] = "savepoint"
return yaml_bsqls
Expand Down
7 changes: 4 additions & 3 deletions test/setup-local-ingress.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ fi

# Patch Coredns to add keyloak.local
# ----------------------------------
kubectl -n kube-system get cm/coredns -o jsonpath=\{".data.Corefile"\} > /tmp/Corefile
kubectl -n kube-system get cm/coredns -o jsonpath=\{".data.NodeHosts"\} > /tmp/NodeHosts
kubectl -n kube-system get cm/coredns -o jsonpath=\{".data.Corefile"\} > /tmp/Corefile || exit 1
kubectl -n kube-system get cm/coredns -o jsonpath=\{".data.NodeHosts"\} > /tmp/NodeHosts || exit 1

while [ -z "$INGRESS_IP" ]; do
INGRESS_IP=$(kubectl -n iff get ingress/keycloak-iff-ingress -o jsonpath=\{".status.loadBalancer.ingress[0].ip"\})
echo waiting for ingress to provide IP-Address
sleep 1
done
echo "$INGRESS_IP" keycloak.local >> /tmp/NodeHosts
echo "$INGRESS_IP" keycloak.local >> /tmp/NodeHosts || exit 1

kubectl -n kube-system create cm coredns --from-file=/tmp/NodeHosts --from-file=/tmp/Corefile --dry-run=client -o yaml | kubectl replace -f -

# Restart coredns
Expand Down
Loading