ZettaDataNet at DEV-TO-DEV Summit 2016

We are presenting Tuktu on Feb. 23 and 24 at the Dev-to-Dev Summit, San Francisco.

The expo is at Spur on 654 Mission, come by if you are in town.

Advertisements
Posted in Uncategorized | Leave a comment

Actions & Facts Revisited

When we initially introduced our data fragment model, for reasons of simplicity, some details were not elaborated because they were not essential to the understanding of the principles the data integration approach is based upon. However, these details are necessary to make the approach work in practice.

Data fragments correspond to facts about resources that are collected during data acquisition actions. Each data fragment is used to store both the data itself (i.e., an atomic piece of information about a resource) and metadata about it (e.g., where, when, by whom, using what tool this data was collected, but also what it represents and how it is represented). This metadata simplifies data integration and improves data quality by making it possible to centralize and automate the detection of inconsistencies and missing information.

This post takes a closer look at the data types of the different data elements that compose data fragments.

The “Data Acquisition Action” Part

Data acquisition actions occur when tools are used to acquire data from data sources, a process that is supported by a Data Acquisition Architecture. In this architecture, each data acquisition tool has a unique identifier, data sources are described in a Data Source Registry, and data acquisition tools register each of their data acquisition actions in a Data Acquisition Action Registry.

In the fragment data model, data acquisition actions are characterized by:

  • An action identifier (e.g., action #1): This is a unique identifier generated by the Data Acquisition Action Registry when a tool registers an action. These identifiers are sortable so that it is possible to order them from the first action to the last one. The current version of the Acquisition Action Registry implements action identifiers as long integers;
  • A timestamp (e.g., 2015-04-02T08:53:02Z): This should correspond to the moment at which an action is successfully completed. In practice, this is the time at which the action is registered. Timestamps are expressed according to the ISO-8601 standard;
  • A tool (e.g., Web Scrapper #24): This is the unique identifier of the tool in the architecture. It is a character string; and
  • A data source (e.g., a given data set from http://census.gov): This is the unique identifier of the source in the Data Source Registry. It is expressed as a character string.

As shown above, the types of the 4 data acquisition action fields are well defined and their domains (i.e., the authorized values for these fields) strictly consists of lists managed in other systems (e.g., Acquisition Action Registry, Data Source Registry).

The “Resource” Part

Resources (i.e., the ‘things’ of interest that are described by the facts collected during the data acquisition actions) are characterized by:

  • A resource type (e.g., product, customer, geographical area): Supported resource types are defined in a dedicated resource type controlled vocabulary managed in a vocabulary bank. In practice, this field contains a term identifier from the resource type vocabulary. It is expressed as a character string.
  • A unique identifier: Resource unique identifiers are generated and managed by the Identifier Service taking into account the type of the resource to identify. In the current implementation, it consists of a long integer.

Here also the types of the 2 resource fields are well defined and their domains strictly consists of lists managed in two other systems (i.e., the Vocabulary Bank and the Identifier Service).

The “Fact” Part

When initially introduced, facts (about resources) were described with 3 fields: A fact property, a fact value, and a fact timestamp. For example, a building (the resource) had a value (the fact property) of 1,000,000 (the fact value) on Jan. 1, 2015 (the fact time stamp). In this simplified representation, information such as the type of the value (a money amount) or its unit (US$) was omitted. It was considered as implicit and depended on the property itself.

This is problematic for several reasons:

  1. When collecting information from several sources, it is the responsibility of each data acquisition tool to ensure that the correct type and unit are used This is complex and ambiguous. For example, if I collect an amount in euros, what change rate does the tool use to do the conversion, how does it get it, and more importantly, what will this amount mean in 6 months?.
  2. Some values are free texts for which it is valuable to mention the language. Others are controlled vocabulary terms for which it is important to identify the source vocabulary.

This is why, in the full version of the fragment data model, facts are characterized by:

  • A fact property: Supported fact property names are defined in a dedicated fact property name controlled vocabulary managed in a vocabulary bank. In practice, this field contains a term identifier from the fact property name vocabulary. It is expressed as a character string.
  • A fact type: Supported fact types are defined in a dedicated fact type controlled vocabulary managed in a vocabulary bank. In practice, this field contains a term identifier from the fact type vocabulary. It is expressed as a character string. Examples of fact types include: vocabulary term, language string, money amount, area, etc.
  • A fact value context: Fact value contexts provide additional information about fact values and depend on the type of the fact. For example, the context of a vocabulary term will be the vocabulary the term comes from, the context of a language string will be the language of the string, the context of a money amount will be the currency of the amount, the context of an area will be the unit in which its surface is expressed, etc. For most fact types, supported fact value contexts are defined in a controlled vocabulary specific to this type. In practice, this field contains a term identifier from the fact value context vocabulary specific to the fact type considered. It is expressed as a character string.
  • A fact value: This is the actual value of the fact. As was just described, its type and domain depends on the fact type field.
  • A fact timestamp: This corresponds to the time at which the fact value applied (e.g., when was the value of a building appraised to $1,000,000 when the person was 35 years of age). Fact timestamps are expressed according to the ISO-8601 standard.

Example

The table below provides a commented example of an action-fact data fragment. It contains 11 rows, one for each fragment field. The first column provides the field name. The second column indicates the type/domain of the field. The third column provides the actual value of the field for the fragment example considered. Finally, the fourth column provides additional comments or explanations.

table

Conclusion

When looking at the 11 fields of a data fragment, fields “resource identifier”, “fact property”, and “fact value” are the actual data. They correspond to the information found in the subject-predicate-object triples used by the semantic web to make statements about resources (e.g., area 94114 has buildings for a total value of $8,508,810,400). The 8 other fields are metadata. They make explicit what the described resource is, how information (i.e., facts) about this resource was collected, what these facts are and what they represent. The existence of this metadata simplifies data integration by making it possible to automatically detect inconsistencies (e.g., values expressed using incompatible units) and missing information (e.g., a string in an unknown language). Explicit metadata will make possible simpler and more efficient code and improved data quality.

Posted in Data Modeling | Tagged , , , , , , , | Leave a comment

A Hands-on Introduction to MapReduce in Python

MapReduce is simple. Some MapReduce algorithms can definitely be more difficult to write than others, but MapReduce as a programming approach is easy. However, people usually struggle the first time they are exposed to it. In our opinion, this comes from the fact that most MapReduce tutorials focus on explaining how to build MapReduce algorithms. Unfortunately, the way MapReduce algorithms are built (i.e., by building a mapper and a reducer) is not necessarily the best way to explain how MapReduce works because it tends to overlook the ‘shuffle’ step, which occurs between the ‘map’ step and the ‘reduce’ step and is usually made transparent by MapReduce frameworks such as Hadoop.

This post is an attempt to present MapReduce the way we would have liked to be introduced to it from the start.

Before you start

This tutorial assumes a basic knowledge of the Python language.

You can download the code presented in the examples below from here. In order to run it, you need:

  • A Unix-like shell (e.g., bash, tcsh) as provided on Linux, Mac OS X, or Windows + Cygwin;
  • Python (e.g., anaconda), note that the examples presented here can be ported into any other language that permits to read data from the standard input and write it to the standard output.

In addition, if you want to run your code on Hadoop, you will need to install Hadoop 2.6, which requires java to run.

Counting Word Occurrences

Counting the number of occurrences of words in a text is sometimes considered as the “Hello world!” equivalent of MapReduce. A classical way to write such a program is presented in the python script below. The script is very simple. It parses the standard input from which it extracts and counts words and stores the result in a dictionary that uses words as keys and the number of occurrences as values.

counter

Let’s look at this script in more details. It uses 2 packages: ‘sys’, which allows for reading data from the standard input, and ‘re’, that provides support for regular expression.

import sys
import re

Before the parsing starts, a dictionary, named ‘sums’, which will be used to store the word counts, is created and initialized.

sums = {}

The standard input is read line by line and put in a variable named ‘line’

for line in sys.stdin:

For each line, a regular expression is used to remove all the leading and trailing non-word characters (e.g., blanks, tabs, punctuation)

    line = re.sub( r'^\W+|\W+$', '', line )

Then a second regular expression is used to split the line (on non-word characters) into words. Note that removing the leading and trailing non-word characters before splitting the line on these non-word characters lets you avoid the creation of empty words. The words obtained this way are put in a list named ‘words’.

    words = re.split( r'\W+', line )

Then, for each word in the list

    for word in words:

The lower-case version of the word is taken.

        word = word.lower()

Before the word is used as key to access the ‘sums’ dictionary and increment its value (i.e., the word occurrence count) by one. Note that taking the lower-case version of each word avoids counting the same word as different words when it appears with different combinations of upper- and lower-case characters in the parsed text.

        sums[word] = sums.get( word, 0 ) + 1

Finally, once the whole text has been parsed, the content of the dictionary is sent to the standard output.

print sums

Now that the script is ready, let’s test it by counting the number of word occurrences in Moby Dick, the novel by Herman Melville. The text can be obtained here.

wget https://www.gutenberg.org/cache/epub/2701/pg2701.txt
mv pg2701.txt input.txt

Its content can be displayed using the cat command. As seen on the screenshot below, it finishes by a notice describing the Project Gutenberg.

cat input.txt

cat

Assuming that the python script was saved in a file named counter.py, which was made executable (chmod +x counter.py), and that the text containing the words to count was saved in a file named input.txt, the script can be launched using the following command line:

./counter.py < input.txt

The result of this is partly displayed in the screenshot below where we can see that the word ‘own’ appears 205 times in the text, the word ‘polished’ appears 7 times, etc.

counter2

Limitations of the Dictionary Approach

The main problem with this approach comes from the fact that it requires the use of a dictionary, i.e., a central data structure used to progressively build and store all the intermediate results until the final result is reached.

Since the code we use can only run on a single processor, the best we can expect is that the time necessary to process a given text will be proportional to the size of the text (i.e., the number of words processed per second is constant). Actually, the performance degrades as the size of the dictionary grows. As shown on the diagram below, the number of words processed per second diminishes when the size of the dictionary reaches the size of the processor data cache (note that if the cache is structured in several layers of different speeds, the processing speed will decrease each time the dictionary reaches the size of a layer). A new diminution of the processing speed will be reached when the dictionary reaches the size of the Random Access Memory. Eventually, if the dictionary continues to grow, it will exceed the capacity of the swap and an exception will be raised.

performance

The MapReduce Approach

The MapReduce approach does not require a central data structure. It consists of 3 steps:

  1. A mapping step that produces intermediate results and associates them with an output key;
  2. A shuffling step that groups intermediate results associated with the same output key; and
  3. A Reducing step that processes groups of intermediate results with the same output key.

This might seem a little abstract so let’s look at what these three steps look like in practice when applied to our example.

Mapping

The mapping step is very simple. It consists of parsing the input text, extracting the words it contains, and, for each word found, producing an intermediate result that says: “this word occurs one time”. Translated into an output key / intermediate result format, this is expressed by using the word itself as output key and the number of occurrences of the word (in this specific case ‘1’) as intermediate result.

The code of the mapper is provided below:

mapper

The beginning of the script is identical to the initial script. The standard input is read line by line and split into words.

Then, for each word in the list

    for word in words:

The lower-case version of the word is sent to the standard output followed by a tabulation and the number 1.

        print( word.lower() + "\t1" )

The word is used as output key and “1” (i.e., the number of occurrences of this word) is the intermediate value. Following a Hadoop convention, the tabulation character (“\t”) is used to separate the key from the value.

As in the initial script, taking the lower-case version of each word avoids counting the same word as different words when it appears with different combination of upper- and lower-case characters in the parsed text.

Assuming that the mapping script is saved in a file named mapper.py, the script can be launched using the following command line:

./mapper.py < input.txt

The result of this is partly displayed in the screenshot below where we can see that the words are sent in the order they appear in text, each of them followed by a tabulation and the number 1.

mapper2

Shuffling

The shuffling step consists of grouping all the intermediate values that have the same output key. As in our example, the mapping script is running on a single machine using a single processor and the shuffling simply consists of sorting the output of the mapping. This can easily be achieved using the standard “sort” command provided by the operating system:

./mapper.py < input.txt | sort

The result of this is partly displayed in the screenshot below. It differs from the output of mapper.py by the fact that the keys (i.e., the words) are now sorted in alphabetic order.

shuffler

Reducing

Now that the different values are ordered by keys (i.e., the different words are listed in alphabetic order), it becomes easy to count the number of times they occur by summing values as long as they have the same key and publish the result once the key changes.

reducer

Let’s look at the reducing script in more details. Like the other scripts, it uses the package ‘sys’, which allows for reading data from the standard input.

import sys

Before the parsing of the standard input starts, two variables are created: ‘sum’, which will be used to store the count of the current word and is initialized to 0 and ‘previous’ that will be used to store the previous key and is initialized to None.

previous = None
sum = 0

Then the standard input is read line by line and put in a variable named ‘line’

for line in sys.stdin:

Then, the key and value are extracted of each line by splitting the line on the tab character.

    key, value = line.split( '\t' )

The current key (key) is compared to the previous one (previous).

    if key != previous:

If the current and previous key are different, it means that a new word has be found and, unless the previous key is empty, this is not the first word (i.e., the script is not handling the very first line of the standard input)

        if previous is not None:

The variable ‘count’ contains the total number of occurrences of the previous word and this result can be displayed (note that this result is also produced using the key-tabulation-value format).

            print str( sum ) + '\t' + previous

As a new word is found, it is necessary to reinitiate the working variables

        previous = key
        sum = 0

And, in all cases, the loop ends with adding the current ‘value’ to the ‘sum’

    sum = sum + int( value )

Finally, after having exited the loop, the last result can be displayed

print str( sum ) + '\t' + previous

Assuming that the reducing script is saved in a file named reducer.py, the full MapReduce job can be launched using the following command line:

./mapper.py < input.txt | sort | ./reducer.py

The result of this is partly displayed in the screenshot below where we can see one word per line with each word preceded by the number of times it occurs in the text.

reducer2

MapReduce Frameworks

In the example above, which runs on a single processing unit, Unix pipes and the sort command are used to group the mapper’s output by key so that it can be consumed by the reducer.

./mapper.py < input.txt | sort | ./reducer.py

Similarly, when using a cluster of computers, a MapReduce framework such as Hadoop is responsible for redistributing data (produced in parallel by mappers running on the different cluster nodes using local data) so that all data with the same output key ends up on the same node to be consumed by a reducer.

As shown by the bash code below, the mapper.py and reducer.py scripts presented above do not need to be changed in order to run on Hadoop.

hadoop

Note that:

  • The output will appear in a file automatically created in directory ‘./output’ (Hadoop will fail if this directory is not empty);
  • Environment variable HADOOP_HOME must point to the directory in which Hadoop has been uncompressed.

Hopefully, as you see, this tutorial simplifies your introduction to MapReduce by making explicit the shuffle step. If you know other ways to learn MapReduce we will be interested to hear about it.

Posted in Tutorial | Tagged , , , , , , | 4 Comments

Data Integration, Part 4: Architectures

This post concludes our series on data integration by reviewing the architectural components necessary to support the data acquisition and integration approach presented earlier.

Data Acquisition Action Registry

Each data fragment in the fragment store was added as the result of a data acquisition action (see part 1). In order to keep track of these actions, each application that brings new fragments into the store (e.g., web scrapers, data extractors, data curation tools) has to register every data acquisition action in a Data Acquisition Action Registry. The registry ensures that each registered action is uniquely identified by providing the action identifiers required by the Action-Fact Fragment data model.

Data Source Registry

The Data Source Registry is a catalog service that provides up-to-date information about the data sources being integrated. This information is machine-readable and describes, for each data source:

  • Its location;
  • The application program interfaces (APIs) it supports;
  • The data available: The type of resources described, the list of record fields provided for each resource and how each of these fields can be mapped to a fact property name and value.

Data acquisition systems (e.g., data scrapers, data extractors) use the Data Source Registry to discover, connect to and query data sources in an automatic way.

Identifier Service

All the resources described in the fragment store are uniquely identified. The Identifier Service is responsible for identifying (i.e., providing a unique identifier to) the resources described in incoming data records. The identification of resources is specific to the resource type and can be more or less complex depending on the existence of a “natural” identifier for the type of resource considered and the presence of this identifier in the data collected. For example, it is easy to identify geographic areas with zip codes, especially when these codes are part of the incoming records. For resource types for which no natural identifier exists, the Identifier Service has to maintain its own identifier scheme. In that case, it is necessary to compare each new incoming record to existing records in order to decide if it corresponds to a new resource that requires a new identifier or to an existing resource for which an existing identifier can be reused.

Vocabulary Bank

As seen previously, facts are basic properties of resources characterized by a property name and a property value.
In order to avoid ambiguities and deal more efficiently with multilingualism aspects, property names and as many property values as possible come from controlled vocabularies managed in an vocabulary bank.
 
We explained the rationale for using controlled vocabularies and managing them in a vocabulary bank in a previous post.

Data Acquisition Architecture

The diagram below shows an architecture that supports the data acquisition workflow introduced in part 2.

architecture

Data is pulled into the workflow using a Data Extractor and/or a Web Scraper. Both of them rely on the Data Source Registry to find and query their targets and on the Data Acquisition Registry to register their data acquisition actions and obtain action ids. Once this is done, the systems push the newly collected data into a Data Receiver that queues it until it is consumed by a Data Ingestor. The Data Ingestor caches the data, splits it into records, and calls an Identifier Service to identify the resources described in the records. Once the records have been identified, the Data Fragmentor looks up:

  • The Data Source Registry to know how to map record fields into fact properties and
  • The Vocabulary Bank to normalize the fact values.

Finally, the Data Fragmentor breaks the records into action-fact fragments and stores them in a Fragment Store.
 
Note that this architecture can easily be extended to support push scenarios with data sources publishing their data directly to the Data Receiver.

Data Integration and Curation Architecture

The diagram below shows how applications can use the data contained in the Fragment Store.

architecture2

Except for the Data Integrator, most applications do not use data fragments directly. Rather get access to data via optimized views produced by a View Builder. As the data contained in the Fragment Store is normalized (i.e., only contains term identifiers), these applications might also have to look up these term identifiers in the Vocabulary Bank before the data can be presented to end-users.
 
In addition to reading data from the Fragment Store via the View Builder, Applications, such as the Data Curation Application of the diagram, that also writes data to the Fragment Store, have to register their data acquisition actions in the Data Acquisition Action Registry before adding data as action-fact fragments directly to the Fragment Store.
 
As data integration occurs at the fragment level, the Data Integrator, which only works at the fragment level (i.e., directly reading and writing fragments), only needs to register its data acquisition actions in the Registry without having to access the Vocabulary Bank and the View Builder.

Posted in Data Integration | Tagged , , , | Leave a comment

Data Integration, Part 3: Data Integration, Data Curation, and Data Views

In recent posts, we have introduced a data acquisition workflow during which data from multiple sources are collected and used to uniquely identify resources of interest and fragment their descriptions into normalized action-fact data fragments. Once this is done, data integration and curation can begin.

In our approach, both activities (i.e., data integration and data curation) consist of data acquisition actions. Actually, data acquisitions (i.e., the creation of data via the addition of new action-fact fragments) are the only actions allowed. Existing data fragments are never updated or even deleted. Unless there is a legal obligation to delete information like, for example, in the framework of data privacy laws.

Data Integration

Data integration occurs at the action-fact fragment level. It is required when two or more fragments are inconsistent. The table below shows an example of such inconsistencies. In this example, two different actions (action #1 and #2) have lead to the acquisition from two different sources (source #13 and #6) with two incompatible values (32100 and 30100) for the population of area 94114 on July 1, 2012.

table1

Once such an inconsistency is detected, there are several ways to resolve it:

  1. Do nothing (and let each application handle the inconsistency);
  2. Select one of the possible values, for example by deciding that data source #13 is more reliable than data source #6 or that the data acquired most recently is more likely to be correct;
  3. Derive a new values from existing values, for example by deciding that the real value of the population is the average of the values provided by source #13 and #6;
  4. Acquire the correct value from a third source.

Unless one decides to do nothing, the data integration results in a new acquisition action.

table2

Data Curation

Like data integration, data curation occurs at the action-fact fragment level and consists of creating, removing, or updating facts about resources by adding new fragments.

table3

In the example above, a new fact was added on Jan. 24, 2015: “the population of area 94113 was 15,000 on Jul. 1, 2012.” It was then modified on Jan. 25, 2015 to: “the population of area 94113 was 17,000 on Jul. 1, 2012.” Finally, this fact was removed on Jan. 26, 2015.

Data Views

Although data fragments are a powerful way to store, integrate and manage data, it is generally more efficient for applications to consume data when it is presented in a more optimized way such as, for example, the facetted descriptions we have introduced in a previous post. These descriptions are obtained by building views.

Views are read-only construction built from data fragments. They are expendable ways of presenting data optimized for the specific applications they are built for.

In addition to application-specific views, this flexible approach allows one to easily build special views such as:

  • State: All the facts at a given timestamp;
  • Fact trending: The evolution of a given fact over time; or
  • Action visualization: All the facts acquired by a given action.

Note that, because views have access to all the fragments, an application can be set to use a view built from already integrated/curated data or use a view built from pre-integrated/curated data. The latter leaves the responsibility of the integration/curation to the application.

Posted in Data Integration | Tagged , , , , , | Leave a comment

Data Integration, Part 2: Data Acquisition Workflow

In this post , we will look at the workflow that leads from the acquisition of raw data about resources to its storage as action-fact data fragments introduced in part 1.

DataAcquisitionWorkflow

As depicted on the diagram above, this workflow consists of 4 main steps:

  1. Data acquisition and caching,
  2. Identification,
  3. Normalization, and
  4. Fragmentation and storage.

Data Acquisition

The data acquisition step consists of acquiring data from a data source. This can involve calling an API, scrapping a website, querying a database, OCR-izing digitized documents, or any other data acquisition action considered as a relevant way to bring new data in the workflow. Once this raw data is acquired, it is immediately time-stamped and stored, either as-is or after transformation into JSON records, into a cache repository. This caching mechanism makes it possible to easily adapt and replay the workflow at anytime by avoiding the hassle of the data acquisition step (and without having to worry about possible changes of states that may have occurred at the source).

Identification

The identification step consists of identifying the resources described in the incoming data records. Depending on the resource type, the data available, and the identifier, the complexity of this task can vary from rather simple to highly complex and demanding. For example, it is relatively easy to identify geographic areas by their zip codes, even if it might involve looking up incomplete addresses or GPS coordinates. Identifying resources described in bibliographical records, on the other hand, from different sources can prove tedious as it potentially requires assigning and maintaining an identifier for each resource and involves comparing each new incoming record to the ones already acquired.

Normalization

As explained in a previous post, controlled vocabularies are a valuable alternative to free texts because their meaning is explicit, which makes it possible to avoid the inevitable ambiguities inherent in natural language.  The normalization step consists of replacing properties and values found in incoming records by the corresponding identifiers from data dictionaries and controlled vocabularies. For example, replacing country names (e.g., Belgium) by ISO3166-1 codes (e.g., “be”).

Fragmentation and Storage

Finally, the fragmentation step consists of breaking each normalized record into actions and facts and storing the resulting action-fact fragment. A new fragment is created for each field in the record. This requires the identification of the facts corresponding to the record fields. It also requires that each fragment be completed with information about the data acquisition action (i.e., action id, data source, and tool id).

When the fragments are ready, they are stored in a fragment store where they can be further integrated, curated, and used as we’ll see in a next post.

Posted in Data Integration | Tagged , , , | Leave a comment