Problema risolto ! Non posso credere di aver passato due giorni interi su questo... Stavo guardando completamente nella direzione sbagliata.
Il problema non riguardava alcune configurazioni di rete Dataflow o GCP e, per quanto ne so...
è vero.
Il problema era ovviamente nel mio codice:solo il problema veniva rivelato solo in un ambiente distribuito. Ho commesso l'errore di aprire il tunnel dal processore principale della pipeline, invece che dagli operai. Quindi il tunnel SSH era attivo ma non tra i lavoratori e il server di destinazione, solo tra la pipeline principale e la destinazione!
Per risolvere questo problema, ho dovuto modificare la mia richiesta DoFn per eseguire il wrapping dell'esecuzione della query con il tunnel:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
come puoi vedere, ho dovuto sovrascrivere alcuni bit della libreria pysql_beam.
Infine, ogni lavoratore apre il proprio tunnel per ogni richiesta. Probabilmente è possibile ottimizzare questo comportamento ma è sufficiente per le mie esigenze.