How to pass training set and labels to Zingg TrainMatch phase using Scala in AWS Glue environment?
Hey! First of all, thank you for your support and aid last time. I was able to test Zingg with python and pysaprk in my dev environment and it went well. Now I need a little bit more assistance: I am trying to run it with Scala (we will need to run this in a Glue environment). I need to pass it directly to the TrainMatch phase providing both the training set and the labels set with the two fields required by Zingg (z_cluster, z_isMatch). I was checking the scala example provided in Zingg repository, but it pass only by the match phase, not needing to give the labels. So I need to know which methods to use in Scala to pass the json config (as we have in python) or to do it using the inMemory method. Here is my incomplete example code:
val cols = Set(
col("id"),
col("n_stripped_venitem"),
col("n_stripped_mfgitem"),
col("n_vendorcurvo"),
col("n_manufacturercurvo"),
col("n_vendor"),
col("n_manufacturer"),
col("n_description"),
col("s_venitem"),
col("s_mfgitem")
)
val zingCols = Seq(
col("labeling_set_id").as("z_cluster"),
col("is_match").as("z_isMatch")
)
val trainingSet = glueContext
.getCatalogSource(
database = dataStagingGlueCatalogDb,
tableName = "zingg_training_set"
)
.getDynamicFrame
.toDF()
.select(cols: _*)
val trainingLabels = glueContext
.getCatalogSource(
database = dataStagingGlueCatalogDb,
tableName = "zingg_training_lables"
)
.getDynamicFrame
.toDF()
.select((zinggCols ++ cols): _*)
// Zingg Field Definition
val args = new Arguments()
val id = new FieldDefinition()
id.setFieldName("id")
id.setDataType("string")
id.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.DONT_USE)))
id.setFields("fname")
val nStrippedVenitem = new FieldDefinition()
nStrippedVenitem.setFieldName("n_stripped_venitem")
nStrippedVenitem.setDataType("string")
nStrippedVenitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
nStrippedVenitem.setFields("fname")
val nStrippedMfgitem = new FieldDefinition()
nStrippedMfgitem.setFieldName("n_stripped_mfgitem")
nStrippedMfgitem.setDataType("string")
nStrippedMfgitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
nStrippedMfgitem.setFields("fname")
val nVendorCurvo = new FieldDefinition()
nVendorCurvo.setFieldName("n_vendorcurvo")
nVendorCurvo.setDataType("string")
nVendorCurvo.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nVendorCurvo.setFields("fname")
val nManufacturerCurvo = new FieldDefinition()
nManufacturerCurvo.setFieldName("n_manufacturercurvo")
nManufacturerCurvo.setDataType("string")
nManufacturerCurvo.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nManufacturerCurvo.setFields("fname")
val nVendor = new FieldDefinition()
nVendor.setFieldName("n_vendor")
nVendor.setDataType("string")
nVendor.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nVendor.setFields("fname")
val nManufacturer = new FieldDefinition()
nManufacturer.setFieldName("n_manufacturer")
nManufacturer.setDataType("string")
nManufacturer.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nManufacturer.setFields("fname")
val nDescription = new FieldDefinition()
nDescription.setFieldName("n_description")
nDescription.setDataType("string")
nDescription.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.TEXT)))
nDescription.setFields("fname")
val sVenitem = new FieldDefinition()
sVenitem.setFieldName("s_venitem")
sVenitem.setDataType("string")
sVenitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
sVenitem.setFields("fname")
val sMfgitem = new FieldDefinition()
sMfgitem.setFieldName("s_mfgitem")
sMfgitem.setDataType("string")
sMfgitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
sMfgitem.setFields("fname")
val fieldDef = new ArrayList[FieldDefinition]()
fieldDef.add(id)
fieldDef.add(nStrippedVenitem)
fieldDef.add(nStrippedMfgitem)
fieldDef.add(nVendorCurvo)
fieldDef.add(nManufacturerCurvo)
fieldDef.add(nVendor)
fieldDef.add(nManufacturer)
fieldDef.add(nDescription)
fieldDef.add(sVenitem)
fieldDef.add(sMfgitem)
args.setFieldDefinition(fieldDef)
args.setModelId("000")
args.setZinggDir(s"s3://${dataStagingBucket}/models")
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.2f)
val inputPipe = new InMemoryPipe(trainingSet);
inputPipe.setProps(new HashMap[String, String]());
val pipes = Array[zingg.client.pipe.Pipe](inputPipe);
args.setData(pipes)