Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GlobalTable not working when using multiple instances #283

Open
2 tasks done
Hamdiovish opened this issue Mar 9, 2022 · 16 comments
Open
2 tasks done

GlobalTable not working when using multiple instances #283

Hamdiovish opened this issue Mar 9, 2022 · 16 comments
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@Hamdiovish
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Running the following app in 2 instances, then by pushing a message to one partition, the instance listening to the related partition detect the message and update the GlobalTable, whilst the other instance will not update its version of the GlobalTable and keep showing nothing.

import faust
from faust.cli import option

app = faust.App(
    'hello-world',
    broker='kafka://localhost:29092',
    key_serializer='raw',
    value_serializer='raw',
	store="rocksdb://", 
	topic_disable_leader=True,
    )

greetings_topic = app.topic('greetings-topic',partitions=2,key_type=str, value_type=str,internal=True)
greetings_table = app.GlobalTable('greetings-table',partitions=2,key_type=str, value_type=str,default=str)

@app.agent(greetings_topic)
async def greet(greetings):
	"""run: faust -A app worker"""
	async for event in greetings.events():
		k = event.key
		v = event.value
		print(f"processing: {k}: {v} on partition: {event.message}")
		greetings_table[k]=v

@app.command(
	option('--k', type=str, default='a',help='The key.'),
	option('--v', type=str, default='0',help='The value.'))
async def sim_greeting(self, k, v, p):
	"""simulate: faust -A app01 sim-greeting --k a --v 0"""
	await greetings_topic.send(key=k,value=v)

@app.timer(interval=1.0)
async def plot(app):
	for k in greetings_table:
		print(f'[{k}:{greetings_table[k]}]')

Expected behavior

All instances should detect GlobalTable updates.

Actual behavior

The GlobalTable is not synced across instances.

Versions

  • Python version: 3.7.12
  • Faust version: 0.8.4
  • Operating system: Ubuntu 18
  • Kafka version: 2.8-IV1
  • RocksDB version: 0.8.0
@Roman1us
Copy link
Contributor

Please check partitioning (what instance consume partition) and rocksdb storage use different paths on instances

@Hamdiovish
Copy link
Author

@Roman1us regarding paths, i’m using different —datadir and —web-port when running the instances.

For partitions consumption, im using the topic “greetings_topic” as a source topic to write into the GlobalTable, and every instance is assigned to a different partition of the “greetings_topic”.

The problem is that instances can’t consume/write data from/into the GlobalTable but in the exact same partition of the source event, as if it is a local “Table”.

The only difference with “GlobalTable” is that at startup, the recovery is done based on all partitions, but once recovery done, every instances will consume/update the GlobalTable only in the assigned partition of the “greetings_topic”.

@brennerd11
Copy link
Contributor

We observed the same issue with GlobalTables on multiple workers. Essentially they do not work currently :-(

@dario-collavini
Copy link

Same here, we are running 5 worker instances that should take advantage of the same GlobalTable but they actually see it just as a LocalTable. It's a shame since we need to workaround by adding a lot of boilerplate code just to mock the GlobalTable behaviour.

@Hamdiovish
Copy link
Author

Hi @dario-collavini, what’s the approach you used to mock the GlobalTable?

@Roman1us
Copy link
Contributor

@Hamdiovish @dario-collavini how many partitions on GlobalTable topic you have?

@Hamdiovish
Copy link
Author

@Roman1us as mentioned in « steps to reproduce » section, the global table of my example has 2 partitions.

@Roman1us
Copy link
Contributor

@Hamdiovish so, you need same table data across all workers, right? Maybe set topic to 1 partition and use_partitioner=True in table config help you?

@Hamdiovish
Copy link
Author

@Roman1us thank you for the prompt reply,
I've updated the global table declaration in line 14 of my example above, added use_partitioner=True and updated the partitions=1, as following:

greetings_table = app.GlobalTable('greetings-table',partitions=1,key_type=str, value_type=str,default=str,use_partitioner=True)

The wrong behavior still happening, at initialization, the recovery occurs as expected and the two instances are showing the same content of the global table.
But later at runtime, when the global table is being updated by the instance_1, the other instance_2 didn't detect the recent update and kept showing the old data.
Here is the output of an example printing the content of the global table, based on the code above:

  • At startup, all instances showing the latest version of global table as expected:

instance_0:

[2022-03-30 19:58:07,736] [4551] [WARNING] [EURUSD:2] 
[2022-03-30 19:58:07,737] [4551] [WARNING] [EURCHF:2] 

instance_1:

[2022-03-30 19:58:01,407] [4535] [WARNING] [EURUSD:2] 
[2022-03-30 19:58:01,408] [4535] [WARNING] [EURCHF:2] 
  • Then we update the key EURUSD to let say 3, the output will be:

instance_0:

[2022-03-30 20:01:07,768] [4551] [WARNING] [EURUSD:2] 
[2022-03-30 20:01:07,770] [4551] [WARNING] [EURCHF:2] 

instance_1:

[2022-03-30 20:01:01,442] [4535] [WARNING] [EURUSD:3] 
[2022-03-30 20:01:01,442] [4535] [WARNING] [EURCHF:2] 
  • The same thing happens if we update the table from a partition managed by the other instance, let say EURCHF to 0, the output will be:

instance_0:

[2022-03-30 20:02:47,787] [4551] [WARNING] [EURUSD:2] 
[2022-03-30 20:02:47,788] [4551] [WARNING] [EURCHF:0] 

instance_1:

[2022-03-30 20:02:51,462] [4535] [WARNING] [EURUSD:3] 
[2022-03-30 20:02:51,462] [4535] [WARNING] [EURCHF:2] 

@Roman1us
Copy link
Contributor

@Hamdiovish show output of kafka-topics.sh with table topic. I think in Kafka you have default (3 maybe) partitions count. Also, please check which consumer consume partition. We are using global tables around a year with partition count 1 and no problems happend

@dario-collavini
Copy link

We saw exactly the same issue reported by @Hamdiovish, updates were not available at runtime.
We were running like 5 partitions.

In our case the workaround is just using local Tables instead of GlobalTables and ensuring the matching of topic partitions and key structure among agents and tables. We need to be sure that the topic of each worker instance's agent that needs to use that table is partitioned with the same key used to access table data, so that all global data is split into local subset accessible by the corresponding instance, which is granted by the usage of the same partition key.

@Hamdiovish
Copy link
Author

Hi @Roman1us, here is the details:
The output of kafka-topics-sh:

Topic: hello-world-greetings-table-changelog	TopicId: AiKHgqRmQ2W7Ob5CG-WpqA	PartitionCount: 1	ReplicationFactor: 1	Configs: cleanup.policy=compact
	Topic: hello-world-greetings-table-changelog	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

The output of the source topic:

Topic: greetings-topic	TopicId: FbrK1G84Tv-UfVF5K9EPSA	PartitionCount: 2	ReplicationFactor: 1	Configs: 
	Topic: greetings-topic	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: greetings-topic	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

Regarding Partition assignment:
The boot log for instance_0:

(faust) root@zarbia:/workspace/fg/test/faust# faust --datadir=data/02 -A app01 worker -l info  --web-port 6061
┌ƒaµS† v0.8.4─┬──────────────────────────────────────────┐
│ id          │ hello-world                              │
│ transport   │ [URL('kafka://localhost:29092')]         │
│ store       │ rocksdb:                                 │
│ web         │ http://localhost:6061/                   │
│ log         │ -stderr- (info)                          │
│ pid         │ 21375                                    │
│ hostname    │ zarbia                                   │
│ platform    │ CPython 3.7.12 (Linux x86_64)            │
│ drivers     │                                          │
│   transport │ aiokafka=0.7.2                           │
│   web       │ aiohttp=3.8.1                            │
│ datadir     │ /workspace/fg/test/faust/data/02         │
│ appdir      │ /workspace/fg/test/faust/data/02/v1      │
└─────────────┴──────────────────────────────────────────┘
[2022-03-31 14:19:54,286] [21375] [INFO] [^Worker]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^-App]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^--Monitor]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^--Producer]: Starting... 
[2022-03-31 14:19:54,288] [21375] [INFO] [^---ProducerBuffer]: Starting... 
[2022-03-31 14:19:54,301] [21375] [INFO] [^--CacheBackend]: Starting... 
[2022-03-31 14:19:54,301] [21375] [INFO] [^--Web]: Starting... 
[2022-03-31 14:19:54,301] [21375] [INFO] [^---Server]: Starting... 
[2022-03-31 14:19:54,302] [21375] [INFO] [^--Consumer]: Starting... 
[2022-03-31 14:19:54,303] [21375] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^--LeaderAssignor]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^--ReplyConsumer]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^--AgentManager]: Starting... 
[2022-03-31 14:19:54,321] [21375] [INFO] [^---Agent: app01.greet]: Starting... 
[2022-03-31 14:19:54,323] [21375] [INFO] [^----OneForOneSupervisor: (1@0x7f407806e190)]: Starting... 
[2022-03-31 14:19:54,323] [21375] [INFO] [^---Conductor]: Starting... 
[2022-03-31 14:19:54,324] [21375] [INFO] [^--TableManager]: Starting... 
[2022-03-31 14:19:54,324] [21375] [INFO] [^---Conductor]: Waiting for agents to start... 
[2022-03-31 14:19:54,325] [21375] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2022-03-31 14:19:55,325] [21375] [INFO] [^---GlobalTable: greetings-table]: Starting... 
[2022-03-31 14:19:55,338] [21375] [INFO] [^----Store: rocksdb:greetings-table]: Starting... 
[2022-03-31 14:19:55,338] [21375] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:55,345] [21375] [INFO] [^---Recovery]: Starting... 
[2022-03-31 14:19:55,346] [21375] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:55,350] [21375] [INFO] Updating subscribed topics to: 
┌Requested Subscription─────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:55,351] [21375] [INFO] Subscribed to topic(s): 
┌Final Subscription─────────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:55,358] [21375] [INFO] Discovered coordinator 1 for group hello-world 
[2022-03-31 14:19:55,358] [21375] [INFO] Revoking previously assigned partitions set() for group hello-world 
[2022-03-31 14:19:55,359] [21375] [INFO] (Re-)joining group hello-world 
[2022-03-31 14:19:58,284] [21375] [INFO] Joined group 'hello-world' (generation 32) with member_id faust-0.8.4-d07053dd-c146-4268-a760-be83c23c9d82 
[2022-03-31 14:19:58,289] [21375] [INFO] Successfully synced group hello-world with generation 32 
[2022-03-31 14:19:58,290] [21375] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {0}        │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:58,292] [21375] [INFO] Executing _on_partitions_assigned 
[2022-03-31 14:19:58,320] [21375] [INFO] generation id 32 app consumers id 32 
[2022-03-31 14:19:59,927] [21375] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:20:01,428] [21375] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ -1     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:20:02,930] [21375] [INFO] [^---Recovery]: standby offsets at start of reading:
┌Reading Starts At - Standby────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ -1     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:20:02,932] [21375] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2022-03-31 14:20:02,932] [21375] [INFO] [^---Recovery]: Resuming flow... 
[2022-03-31 14:20:02,933] [21375] [INFO] [^---Fetcher]: Starting... 
[2022-03-31 14:20:04,508] [21375] [INFO] [^---Recovery]: Done reading from changelog topics 
[2022-03-31 14:20:04,509] [21375] [INFO] [^---Recovery]: Recovery complete 
[2022-03-31 14:20:04,509] [21375] [INFO] [^---Recovery]: Starting standby partitions... 
[2022-03-31 14:20:05,937] [21375] [INFO] [^---Recovery]: Highwater for standby changelog partitions:
┌Highwater - Standby────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:20:05,938] [21375] [INFO] [^---Recovery]: Restore complete! 
[2022-03-31 14:20:05,939] [21375] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-03-31 14:20:05,942] [21375] [INFO] [^---Recovery]: Worker ready 
[2022-03-31 14:20:05,943] [21375] [INFO] [^Worker]: Ready 
[2022-03-31 14:20:15,946] [21375] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:15,946] [21375] [WARNING] [EURCHF:1]** 
**[2022-03-31 14:20:23,054] [21375] [WARNING] processing: EURCHF: 2 on partition: <ConsumerMessage: TopicPartition(topic='greetings-topic', partition=0) offset=8>** 
[2022-03-31 14:20:25,946] [21375] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:25,947] [21375] [WARNING] [EURCHF:2]** 

[2022-03-31 14:20:35,950] [21375] [WARNING] [EURUSD:3] 
[2022-03-31 14:20:35,950] [21375] [WARNING] [EURCHF:2] 

The boot log of instance_1:

(faust) root@zarbia:/workspace/fg/test/faust# faust --datadir=data/01 -A app01 worker -l info  --web-port 6060
┌ƒaµS† v0.8.4─┬──────────────────────────────────────────┐
│ id          │ hello-world                              │
│ transport   │ [URL('kafka://localhost:29092')]         │
│ store       │ rocksdb:                                 │
│ web         │ http://localhost:6060/                   │
│ log         │ -stderr- (info)                          │
│ pid         │ 21310                                    │
│ hostname    │ zarbia                                   │
│ platform    │ CPython 3.7.12 (Linux x86_64)            │
│ drivers     │                                          │
│   transport │ aiokafka=0.7.2                           │
│   web       │ aiohttp=3.8.1                            │
│ datadir     │ /workspace/fg/test/faust/data/01         │
│ appdir      │ /workspace/fg/test/faust/data/01/v1      │
└─────────────┴──────────────────────────────────────────┘
[2022-03-31 14:19:50,804] [21310] [INFO] [^Worker]: Starting... 
[2022-03-31 14:19:50,806] [21310] [INFO] [^-App]: Starting... 
[2022-03-31 14:19:50,807] [21310] [INFO] [^--Monitor]: Starting... 
[2022-03-31 14:19:50,807] [21310] [INFO] [^--Producer]: Starting... 
[2022-03-31 14:19:50,807] [21310] [INFO] [^---ProducerBuffer]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^--CacheBackend]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^--Web]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^---Server]: Starting... 
[2022-03-31 14:19:50,819] [21310] [INFO] [^--Consumer]: Starting... 
[2022-03-31 14:19:50,820] [21310] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2022-03-31 14:19:50,836] [21310] [INFO] [^--LeaderAssignor]: Starting... 
[2022-03-31 14:19:50,837] [21310] [INFO] [^--ReplyConsumer]: Starting... 
[2022-03-31 14:19:50,837] [21310] [INFO] [^--AgentManager]: Starting... 
[2022-03-31 14:19:50,837] [21310] [INFO] [^---Agent: app01.greet]: Starting... 
[2022-03-31 14:19:50,839] [21310] [INFO] [^----OneForOneSupervisor: (1@0x7efff4120150)]: Starting... 
[2022-03-31 14:19:50,839] [21310] [INFO] [^---Conductor]: Starting... 
[2022-03-31 14:19:50,839] [21310] [INFO] [^--TableManager]: Starting... 
[2022-03-31 14:19:50,840] [21310] [INFO] [^---Conductor]: Waiting for agents to start... 
[2022-03-31 14:19:50,840] [21310] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2022-03-31 14:19:51,840] [21310] [INFO] [^---GlobalTable: greetings-table]: Starting... 
[2022-03-31 14:19:51,853] [21310] [INFO] [^----Store: rocksdb:greetings-table]: Starting... 
[2022-03-31 14:19:51,853] [21310] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:51,860] [21310] [INFO] [^---Recovery]: Starting... 
[2022-03-31 14:19:51,861] [21310] [INFO] [^--Producer]: Creating topic 'hello-world-greetings-table-changelog' 
[2022-03-31 14:19:51,864] [21310] [INFO] Updating subscribed topics to: 
┌Requested Subscription─────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:51,865] [21310] [INFO] Subscribed to topic(s): 
┌Final Subscription─────────────────────┐
│ topic name                            │
├───────────────────────────────────────┤
│ greetings-topic                       │
│ hello-world-greetings-table-changelog │
└───────────────────────────────────────┘ 
[2022-03-31 14:19:51,872] [21310] [INFO] Discovered coordinator 1 for group hello-world 
[2022-03-31 14:19:51,872] [21310] [INFO] Revoking previously assigned partitions set() for group hello-world 
[2022-03-31 14:19:51,874] [21310] [INFO] (Re-)joining group hello-world 
[2022-03-31 14:19:51,977] [21310] [INFO] Joined group 'hello-world' (generation 31) with member_id faust-0.8.4-ee0a42ea-0c88-4279-bd87-69e6cb5368c9 
[2022-03-31 14:19:51,977] [21310] [INFO] Elected group leader -- performing partition assignments using faust 
[2022-03-31 14:19:51,982] [21310] [INFO] Successfully synced group hello-world with generation 31 
[2022-03-31 14:19:51,983] [21310] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {0-1}      │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:51,985] [21310] [INFO] Executing _on_partitions_assigned 
[2022-03-31 14:19:52,008] [21310] [INFO] opening partition 0 for gen id 31 app id 31 
[2022-03-31 14:19:52,271] [21310] [INFO] generation id 31 app consumers id 31 
[2022-03-31 14:19:53,616] [21310] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:19:55,117] [21310] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ -1     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:19:55,121] [21310] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2022-03-31 14:19:55,121] [21310] [INFO] [^---Recovery]: Resuming flow... 
[2022-03-31 14:19:55,122] [21310] [INFO] [^---Fetcher]: Starting... 
[2022-03-31 14:19:56,627] [21310] [INFO] [^---Recovery]: Done reading from changelog topics 
[2022-03-31 14:19:56,627] [21310] [INFO] [^---Recovery]: Recovery complete 
[2022-03-31 14:19:56,628] [21310] [INFO] [^---Recovery]: Restore complete! 
[2022-03-31 14:19:56,628] [21310] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-03-31 14:19:56,632] [21310] [INFO] [^---Recovery]: Worker ready 
[2022-03-31 14:19:56,633] [21310] [INFO] [^Worker]: Ready 







[2022-03-31 14:19:58,277] [21310] [WARNING] Heartbeat failed for group hello-world because it is rebalancing 
[2022-03-31 14:19:58,278] [21310] [INFO] Revoking previously assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {0-1}      │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:58,281] [21310] [INFO] (Re-)joining group hello-world 
[2022-03-31 14:19:58,284] [21310] [INFO] Joined group 'hello-world' (generation 32) with member_id faust-0.8.4-ee0a42ea-0c88-4279-bd87-69e6cb5368c9 
[2022-03-31 14:19:58,284] [21310] [INFO] Elected group leader -- performing partition assignments using faust 
[2022-03-31 14:19:58,289] [21310] [INFO] Successfully synced group hello-world with generation 32 
[2022-03-31 14:19:58,290] [21310] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────────────────────┬────────────┐
│ topic                                 │ partitions │
├───────────────────────────────────────┼────────────┤
│ greetings-topic                       │ {1}        │
│ hello-world-greetings-table-changelog │ {0}        │
└───────────────────────────────────────┴────────────┘ for group hello-world 
[2022-03-31 14:19:58,292] [21310] [INFO] Executing _on_partitions_assigned 
[2022-03-31 14:19:58,321] [21310] [INFO] generation id 32 app consumers id 32 
[2022-03-31 14:19:59,626] [21310] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─────────────────────┬───────────┬───────────┐
│ topic                                 │ partition │ highwater │
├───────────────────────────────────────┼───────────┼───────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11        │
└───────────────────────────────────────┴───────────┴───────────┘ 
[2022-03-31 14:19:59,630] [21310] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active─────────────┬───────────┬────────┐
│ topic                                 │ partition │ offset │
├───────────────────────────────────────┼───────────┼────────┤
│ hello-world-greetings-table-changelog │ 0         │ 11     │
└───────────────────────────────────────┴───────────┴────────┘ 
[2022-03-31 14:19:59,634] [21310] [INFO] [^---Recovery]: Resuming flow... 
[2022-03-31 14:19:59,635] [21310] [INFO] [^---Recovery]: Recovery complete 
[2022-03-31 14:19:59,635] [21310] [INFO] [^---Recovery]: Restore complete! 
[2022-03-31 14:19:59,636] [21310] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-03-31 14:19:59,639] [21310] [INFO] [^---Recovery]: Worker ready 
[2022-03-31 14:20:06,634] [21310] [WARNING] [EURUSD:3] 
[2022-03-31 14:20:06,635] [21310] [WARNING] [EURCHF:1] 
[2022-03-31 14:20:16,636] [21310] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:16,637] [21310] [WARNING] [EURCHF:1]** 
[2022-03-31 14:20:26,637] [21310] [WARNING] [EURUSD:3] 
**[2022-03-31 14:20:26,638] [21310] [WARNING] [EURCHF:1]** 

[2022-03-31 14:20:36,641] [21310] [WARNING] [EURUSD:3] 
[2022-03-31 14:20:36,641] [21310] [WARNING] [EURCHF:1] 

Just a quick note, Global Table works pretty well when there is only one instance, but the problems will start once another instance is added.
@Roman1us do you have any minimal working example for GlobalTable with multiple instances? or feel free to run the script above to trigger the issue on your ends.
Thank you!

@Roman1us
Copy link
Contributor

Roman1us commented Apr 4, 2022

@Hamdiovish okay, i'll try to reproduce this and maybe debug this behavior

@lorinmetzger
Copy link
Contributor

lorinmetzger commented May 1, 2022

I have experienced a similar problem, with a global table that has 1 partition changelog topic, and multiple workers

I've narrowed my problem down to the fact that at least one worker out of a group of workers sharing a global table will be assigned only "active" changelog topic partitions, and not standbys, all the other workers will get active and standbys assigned to them here

For the worker that has "actives" only when the _restart_recovery runs in recovery.py it pauses all active partitions after the initial recovery. Which it should do if the "active" partitions are also in the standby set, because the standby partition set gets resumed here but when the active partitions are not in the standby set nothing ever resumes the changelog topic partition for that worker.

This problem can be masked if the worker which has only "active" partitions assigned is also the worker which is the leader of the faust topic partition being used to modify the table in that case the local modifications keep the table in sync even though the "active" changelog topics were not resumed (but this pairing is not a guarantee). As soon as the leader of the topic updating the table is a different worker than the worker which has only the active changelog topic partitions and not standbys that workers table will not receive update events, and get out of sync.

I'm not sure that this is actually the right place to address this problem, but the workaround I've put in to address this problem is here. Curious if there is a better place to address this?

One last note for anyone not doing high volume table updates, if you are checking to make sure your tables are staying in sync one change at a time, you will want to set recovery_buffer_size=1 to make sure your changes are immediately applied, otherwise your tables will appear out of sync even though the change events are being received and just not applied.

@wbarnha
Copy link
Member

wbarnha commented Aug 10, 2022

Current solution to this problem as of v0.8.6 is to initialize a GlobalTable with the options partitions=1, recovery_buffer_size=1, as indicated above.

@wbarnha wbarnha added bug Something isn't working help wanted Extra attention is needed labels Aug 30, 2022
@tobiasbhn
Copy link

I am also facing this Issue. The mentioned Solution does not work for us, as we need more than 1 partition in order to split the work accross multiple physical nodes. Am I missing something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

7 participants