Just for the sake of sharing and searchability, here is a code snippet for running linking between 3 data sources:
from zingg.client import *
from zingg.pipes import *
args = Arguments()
args.setModelId(modelId)
args.setZinggDir(zinggDir)
df = spark.read.option("header", "true").csv(source_path).select("FirstName", "LastName")
df1 = df.limit(1000)
df2 = df.limit(100)
df3 = df.limit(200)
inputPipe1 = InMemoryPipe("source1", df1)
inputPipe2 = InMemoryPipe("source2", df2)
inputPipe3 = InMemoryPipe("source3", df3)
outputPipe = Pipe(name="output", format="delta")
outputPipe.addProperty("path", output_path)
args.setData(inputPipe1, inputPipe2, inputPipe3)
args.setOutput(outputPipe)
field_FirstName = FieldDefinition("FirstName", "string", MatchType.FUZZY)
field_LastName = FieldDefinition("LastName", "string", MatchType.FUZZY)
fieldDefs = [field_FirstName, field_LastName]
args.setFieldDefinition(fieldDefs)
options = ClientOptions([ClientOptions.PHASE, "link"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()
Feel free to let me know if something about this can be done better.