Thank you Sonal. I wasn't able to find a programmatically way for predefined "trainingSetLabels" and how to use it with the InMemoryPipe class. Is there a way to set both the training set input as well as the training set labels as a input for Zingg? For example the Arguments class has the methods setData and setOut, but there isn't a corresponding method for the predefined labels.
Yes. I see. In my python script, I call Zingg like this:
options = ClientOptions(["--phase", "trainMatch", "--conf", "/mnt/config/config-010.json"])
args = Arguments.createArgumentsFromJSON(options.getConf(), options.getPhase())
zingg = ZinggWithSpark(args, options)
zingg.init()
zingg.execute()Specifying in the config json both the training set parquet file location, as the label set file. I would like to do something alike using Scala, but wasn't able to find a "createArgumentsFromJSON" method. So I would like to know if there is such method or a Scala documentation API to dig into and adapt it to my needs.
Yes. I used this as reference. I just want to know if there is a way to do the same using Scala.
Hey! First of all, thank you for your support and aid last time. I was able to test Zingg with python and pysaprk in my dev environment and it went well. Now I need a little bit more assistance: I am trying to run it with Scala (we will need to run this in a Glue environment). I need to pass it directly to the TrainMatch phase providing both the training set and the labels set with the two fields required by Zingg (z_cluster, z_isMatch). I was checking the scala example provided in Zingg repository, but it pass only by the match phase, not needing to give the labels. So I need to know which methods to use in Scala to pass the json config (as we have in python) or to do it using the inMemory method. Here is my incomplete example code:
val cols = Set(
col("id"),
col("n_stripped_venitem"),
col("n_stripped_mfgitem"),
col("n_vendorcurvo"),
col("n_manufacturercurvo"),
col("n_vendor"),
col("n_manufacturer"),
col("n_description"),
col("s_venitem"),
col("s_mfgitem")
)
val zingCols = Seq(
col("labeling_set_id").as("z_cluster"),
col("is_match").as("z_isMatch")
)
val trainingSet = glueContext
.getCatalogSource(
database = dataStagingGlueCatalogDb,
tableName = "zingg_training_set"
)
.getDynamicFrame
.toDF()
.select(cols: _*)
val trainingLabels = glueContext
.getCatalogSource(
database = dataStagingGlueCatalogDb,
tableName = "zingg_training_lables"
)
.getDynamicFrame
.toDF()
.select((zinggCols ++ cols): _*)
// Zingg Field Definition
val args = new Arguments()
val id = new FieldDefinition()
id.setFieldName("id")
id.setDataType("string")
id.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.DONT_USE)))
id.setFields("fname")
val nStrippedVenitem = new FieldDefinition()
nStrippedVenitem.setFieldName("n_stripped_venitem")
nStrippedVenitem.setDataType("string")
nStrippedVenitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
nStrippedVenitem.setFields("fname")
val nStrippedMfgitem = new FieldDefinition()
nStrippedMfgitem.setFieldName("n_stripped_mfgitem")
nStrippedMfgitem.setDataType("string")
nStrippedMfgitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
nStrippedMfgitem.setFields("fname")
val nVendorCurvo = new FieldDefinition()
nVendorCurvo.setFieldName("n_vendorcurvo")
nVendorCurvo.setDataType("string")
nVendorCurvo.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nVendorCurvo.setFields("fname")
val nManufacturerCurvo = new FieldDefinition()
nManufacturerCurvo.setFieldName("n_manufacturercurvo")
nManufacturerCurvo.setDataType("string")
nManufacturerCurvo.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nManufacturerCurvo.setFields("fname")
val nVendor = new FieldDefinition()
nVendor.setFieldName("n_vendor")
nVendor.setDataType("string")
nVendor.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nVendor.setFields("fname")
val nManufacturer = new FieldDefinition()
nManufacturer.setFieldName("n_manufacturer")
nManufacturer.setDataType("string")
nManufacturer.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.FUZZY)))
nManufacturer.setFields("fname")
val nDescription = new FieldDefinition()
nDescription.setFieldName("n_description")
nDescription.setDataType("string")
nDescription.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.TEXT)))
nDescription.setFields("fname")
val sVenitem = new FieldDefinition()
sVenitem.setFieldName("s_venitem")
sVenitem.setDataType("string")
sVenitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
sVenitem.setFields("fname")
val sMfgitem = new FieldDefinition()
sMfgitem.setFieldName("s_mfgitem")
sMfgitem.setDataType("string")
sMfgitem.setMatchType(new ArrayList[MatchType](Arrays.asList(MatchType.EXACT)))
sMfgitem.setFields("fname")
val fieldDef = new ArrayList[FieldDefinition]()
fieldDef.add(id)
fieldDef.add(nStrippedVenitem)
fieldDef.add(nStrippedMfgitem)
fieldDef.add(nVendorCurvo)
fieldDef.add(nManufacturerCurvo)
fieldDef.add(nVendor)
fieldDef.add(nManufacturer)
fieldDef.add(nDescription)
fieldDef.add(sVenitem)
fieldDef.add(sMfgitem)
args.setFieldDefinition(fieldDef)
args.setModelId("000")
args.setZinggDir(s"s3://${dataStagingBucket}/models")
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.2f)
val inputPipe = new InMemoryPipe(trainingSet);
inputPipe.setProps(new HashMap[String, String]());
val pipes = Array[zingg.client.pipe.Pipe](inputPipe);
args.setData(pipes)Is there a way to do this and overcome this error?
That is the case. I would like to manually provide the labels and directly pass to matchTrain phase.
Hey! I was able to not run into the same issue using Zingg version 0.4.0 and Spark 3.5 and Python 3.9 running on a local container, but found a new error. Here is my code and the config.json file:
from pyspark.sql import SparkSession
# Defining spark session first
spark = (
SparkSession
.builder
.master("local[*]")
.appName("zingg-test")
.config("spark.jars", "/zingg/zingg-0.4.0.jar")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
# The same session will be used when call Zingg scripts
from zingg.pipes import *
from zingg.client import *
# Load training data
df_training = (
spark
.read
.parquet("/mnt/b001-training-set/batch_id=99999")
)
# Save casted version of it
df_training_set_string_casted = df_training.select([col(x).cast("string") for x in df_training.columns])
(
df_training_set_string_casted
.write
.format("parquet")
.mode("overwrite")
.save("/mnt/temp/b001-training-set-string-casted")
)
# Load pre existing labeled data
df_training_labels = (
spark
.read
.parquet("/mnt/b002-labels/batch_id=99999")
.drop("train", "enforce")
)
df_training_labels.createOrReplaceTempView("training")
# Prepare data in acceptable format for Zingg
query_prepare_training_set = """
with pairs as (
select array(struct(a.*), struct(b.*)) as pair,
(a.label == b.label) as z_isMatch,
uuid() as z_cluster
from training a, training b
where 1 = 1
and a.labeling_set_id = b.labeling_set_id
and a.id != b.id
),
book as (
select explode(pairs.pair) as book,
z_cluster,
z_isMatch
from pairs
)
select book.id as z_zid,
"sample" as z_source,
z_cluster,
z_isMatch,
book.id,
book.n_stripped_venitem,
book.n_stripped_mfgitem,
book.n_vendorcurvo,
book.n_manufacturercurvo,
book.n_vendor,
book.n_manufacturer,
book.n_description,
book.n_stripped_venitem_long_length,
book.n_stripped_mfgitem_long_length,
book.n_stripped_venitem_prefix,
book.n_stripped_mfgitem_prefix,
book.n_vendor_3,
book.n_manufacturer_3,
book.n_stripped_venitem_numeric,
book.n_stripped_mfgitem_numeric,
book.n_description_numeric,
book.n_stripped_mfgitem_freq_indicator,
book.n_stripped_venitem_freq_indicator,
book.n_description_prefix_numeric,
book.n_description_postfix_numeric,
book.normalized_description,
book.reproc
from book
"""
df_labels = spark.sql(query_prepare_training_set)
df_labels_string_cast = df_labels.select([col(x).cast("string") for x in df_labels.columns])
(
df_labels_string_cast
.write
.format("parquet")
.mode("overwrite")
.save("/mnt/models/recording_matching/000/trainingData/marked")
)
# Zingg
options = ClientOptions(["--phase", "trainMatch", "--conf", "/mnt/config/config.json"])
args = Arguments.createArgumentsFromJSON(options.getConf(), options.getPhase())
zingg = ZinggWithSpark(args, options)
zingg.init()
zingg.execute()The config file:
{
"fieldDefinition": [
{
"fieldName": "id",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_venitem",
"matchType": "only_alphabets_exact",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_mfgitem",
"matchType": "only_alphabets_exact",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_vendorcurvo",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_manufacturercurvo",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_vendor",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_manufacturer",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_description",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_venitem_long_length",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_mfgitem_long_length",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_venitem_prefix",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_mfgitem_prefix",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_vendor_3",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_manufacturer_3",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_venitem_numeric",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_mfgitem_numeric",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_description_numeric",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_mfgitem_freq_indicator",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_stripped_venitem_freq_indicator",
"matchType": "dont_use",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_description_prefix_numeric",
"matchType": "null_or_blank",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "n_description_postfix_numeric",
"matchType": "null_or_blank",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "normalized_description",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
},
{
"fieldName": "reproc",
"matchType": "fuzzy",
"fields": "fname",
"dataType": "string"
}
],
"trainingSamples" : [
{
"name":"trainingSetLabels",
"format":"parquet",
"props": {
"location": "/mnt/models/record_matching/000/trainingData/marked"
}
}
],
"output": [
{
"name":"matchesOutput",
"format":"csv",
"props": {
"location": "/mnt/csv/matches_output",
"delimiter": ",",
"header": true
}
}
],
"data": [
{
"name":"trainingSetInput",
"format":"parquet",
"props": {
"location": "/mnt/temp/b001-training-set-string-casted"
}
}
],
"labelDataSampleSize" : 0.5,
"numPartitions": 15,
"modelId": "000",
"zinggDir": "models"
}And got this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[33], line 3
1 zingg = ZinggWithSpark(args, options)
2 zingg.init()
----> 3 zingg.execute()
File /usr/local/lib/python3.9/dist-packages/zingg/client.py:128, in Zingg.execute(self)
126 def execute(self):
127 """ Method to execute this class object """
--> 128 self.client.execute()
File /usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
1298 command = proto.CALL_COMMAND_NAME +\
1299 self.command_header +\
1300 args_command +\
1301 proto.END_COMMAND_PART
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1307 for temp_arg in temp_args:
1308 temp_arg._detach()
File /spark/python/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)
File /usr/local/lib/python3.9/dist-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o921.execute.
: zingg.common.client.ZinggClientException
at zingg.common.core.executor.Trainer.execute(Trainer.java:60)
at zingg.common.core.executor.TrainMatcher.execute(TrainMatcher.java:34)
at zingg.common.client.Client.execute(Client.java:251)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)I am following the the same steps as the example provided here: https://aws.amazon.com/blogs/big-data/entity-resolution-and-fuzzy-matches-in-aws-glue-using-the-zingg-open-source-library/ Could you point me where I am not doing it right please?
Unfortunately, same error again.
Thank you for your response! I updated the environment, cloning the branch code and tried to compile, but got this this time:
66.48 [ERROR] Failed to execute goal on project zingg-common-core: Could not resolve dependencies for project zingg:zingg-common-core:jar:0.4.0
66.48 [ERROR] dependency: zingg:zingg-common-client:jar:0.4.0 (compile)
66.48 [ERROR] Could not find artifact zingg:zingg-common-client:jar:0.4.0 in Central Repository (https://repo1.maven.org/maven2/)
66.48 [ERROR] Could not find artifact zingg:zingg-common-client:jar:0.4.0 in SparkPackagesRepo (https://repos.spark-packages.org/)
66.48 [ERROR] Could not find artifact zingg:zingg-common-client:jar:0.4.0 in central (https://repo.maven.apache.org/maven2)
I'm using zingg 0.3.4 which is the proper version for Glue 4.0 accordingly with this article: https://aws.amazon.com/blogs/big-data/entity-resolution-and-fuzzy-matches-in-aws-glue-using-the-zingg-open-source-library/