Wednesday, March 20, 2013

Parallelism in SAP Sybase ASE

Here're some random notes on the parallelism of SAP Sybase ASE.

The Parallel execution in ASE is implemented by the EXCHANGE operator. It marks the boundary between producer and consumer. Consumer process reads data from a pipe.

Parallelism implementations:

Table Scan:
For an unpartitioned table, hash-based table scan is used. Each work process examines 1/k rows of the page.
For partitioned-table, each partition is process by a work process

Scalar aggregation:
The aggregation operation can be performed in two phases or serial:
- Two-phase: using two scalar aggregate operators. The lower scalar aggregation operator performs aggregation on the data stream in parallel. The result of scalar aggregation is merged and aggregated a second time.
- Serial:  the result of the parallel scan is merged and aggregated.

Union all:

- parallel: each of its operands must be of the same degree, then union all is done in parallel, and result is merged.
- serial: when restricted by selective predicates, the amount of data being sent through the union all operator is small enough, and only scan is done in parallel. The merged scan result is send to union all.

Join:
- two tables with same useful partitioning
This is called an equipartitioned join. Joins are done in each partition in parallel, then merged. (nested loop join is often used)

- One of the tables(tb2) with useful partitioning
The query optimizer dynamically repartitions the table (tb1) to match the partitioning of tb2, by using hash partitioning on the join column. So the 1st exchange operator does the parallel repartition of tb1. The 2nd exchange operator does the join. (merge join is often used)

- Both tables with useless partitioning
Both tables will be repartitioned. The repartitioned operands of the join are equipartitioned with respect to the join predicate. (hash join is often used)

- Replicated join:
When a large table has a useful index on the joining column, but useless partitioning, and joins to a small table. The small table can be replicated N ways to that of the inner table, where N is the number of partitions of the large table. Each partition of the large table is joined with the small table and, because no exchange operator is needed on the inner side of the join, an index nested-loop join is allowed.

- Parallel reformatting
When there is no useful index on the joining column or nested-loop join is the only viable option.
The outer side may have useful partitioning or we can repartitioned to create that. But for the inner side of a nested-loop join, any repartitioning means that the table must be reformatted into a worktable that uses the new partitioning strategy, then then creating an index on the joining predicate. The inner scan of a nested-loop join then access the worktable.

Vector aggregation (group-by):
- In-partitioned vector aggregation
If grouping is partitioned on a subset, or on the same columns as that of the columns in the group by clause, the grouping operation can be done in parallel on each of the partitions.

- Repartitioned vector aggregation
If the the partitioning is not useful, repartitioning the source data to match the grouping columns, then applying the parallel vector aggregation.

Distinct
same as group by.

Queries with an in list
in (...) list is transformed into an or list. The values in the in list are put into a special in-memory table and sorted for removal of duplicates.The table is then joined back with the base table using an index nested-loop join.

Queries with or clauses
The set of conjunctive predicates on each side of the disjunction must be indexable. We apply each side of the disjunction separately to qualify a set of row IDs (RIDs). We can use different indexes here. The predicates may qualify an overlapping set of data rows. These row IDs are then merged to duplicate elimination. Finally, RIDs are joined back to the base table to get the results.

Queries with an order by clause
If no inherent ordering is available, it needs to repartition an existing data stream or it may use the existing partitioning scheme, then apply the sort to each of the constituent streams. Result is merged.

Select into clauses
- Creates the new table using the columns specified in the select into statement.
- Creates N partitions in the new table, where N is the degree of parallelism that the optimizer chooses for the insert operation in the query.
- Populates the new table with query results, using N worker processes.
- Unpartitions the new table, if no specific destination partitioning is required.
If destination partitioning is specified:
- If the destination table has the same partition as the source data, and there is enough data to insert, the insert operator executes in parallel.
- If the source partitioning does not match that of the destination table, the source data must be repartitioned. Insert is done in parallel.

insert/delete/update
insert, delete, and update operations are done in serial.
However, tables other than the destination table can be accessed in parallel.

Partition elimination
Query processor is able to disqualify range, hash, and list partitions at compile time. With hash partitions, only equality predicates can be used, whereas for range and list partitions, equality and in-equality predicates can be used to eliminate partitions.

Thursday, March 14, 2013

SAP DKOM 2013 talk: HANA Query Federation

In SAP DKOM 2013 conference, most topics are about the HANA platform, which clearly reflects SAP's determination and efforts to becoming the 2nd largest database company. Of all the topics, an interesting one to me is HANA federation.

The SAP real time data platform includes four major components: Transactional Data Management(Sybase ASE), In-Memory Innovations(HANA DB), Analytics EDW Data Management(Sybase IQ), Mobile Data Management(SQL Anywhere). It may also contain a Complex Event Processing(CEP) module as the data source. Built on top of that are SAP's solutions including Business Suites and Cloud/Mobile/Analytics Apps. Of course, the core of this platform is the HANA DB which provide real-time analytics.

With so many different DBMS underneath, an obvious and challenging question is how to federate the access to those data? SAP is developing solutions so that in the HANA studio we can issue queries not only on HANA tables, but also on any kinds of data sources as along as ODBC is supported. We can join the HANA tables with other tables like Sybase ASE, IQ, or even Hadoop, and everything is transparent for the customers.

In order to achieve this, we first create proxy tables that links to the other underlying data sources, for example Hadoop. Then the query plan will ship corresponding operators to those servers, get translated into Hive scripts, and run remotely there. The results will be transmitted back to HANA studio. Specifically filtering operators are also passed to the remote server to minimize the data transmitted.

Sometimes the remote server may take quite some time to finish the job. But the optimizer in the main server cannot have any estimates at this time. For example Hadoop. As an ongoing future work SAP is collaborating with Intel, to estimate and monitor the running time of jobs in remote server, so that the main server won't need to hang there waiting.

Saturday, March 09, 2013

Brief notes on Map Reduce 2 (YARN)

 Modules:
YARN stands for “Yet-Another-Resource-Negotiator”. The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker: resource management and job scheduling/monitoring, into separate daemons:
- A ResourceManager (RM) that manages the global assignment of compute resources to applications.
- A per-application ApplicationMaster (AM) that manages the application’s life cycle.
It is a more isolated and scalable model than the MR1 system.

The ResourceManager has two main components:
- Scheduler
Responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc

- ApplicationsManager (AsM)
Responsible for accepting job-submissions, negotiating the first container for executing the application specific AM and provides the service for restarting the AM container on failure.

NodeManager (NM): per-node
It's per-node slave of the ResourceManager, responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler. The design also allows plugging long-running application-specific auxiliary services to the NMs during startup. Shuffle is a typical auxiliary service loaded by the NMs.

ApplicationMaster (AM): per-application(job)
Responsible for negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. Each AM manages the application’s individual tasks, and each task runs within a Container on each node

Resource Allocation Process:
Resources are requested in the form of containers, where each container has a number of non-static attributes.

Client - RM
Client submit request to RM/AsM, upon response from RM, sent application submission context.
Client also asks for and gets response of application report from RM/AsM.

RM - AM
AM registers itself to RM, RM gives it resource stat of the cluster.
AM sends resource allocation request to RM. RM/scheduler will give a list of containers.

AM - NM
AM sends container start request to NM.

Friday, March 08, 2013

Watch out QR code security

Attacks are everywhere! Just noticed that there're malicious QR code attacks that might trick you into some fake website, or steal information from you. Due to the limited amount information encoded, most of the attacks are either phishing attack or code injection. You really need to pay attention to the website you opened and see if it's as expected. Especially be cautious if you got a prompted form asking for filling out some private info, or asking you to install some package. Best prevention is: only scan those you trust, and ignore those fancy ads on the wall in the streets :)