Skip to content

Graph

pgraf.graph

PGraf

Manage and Search the Property Graph.

The PGraf class is the main entry point for interacting with the graph database. It provides methods for adding nodes and edges, querying, traversing the graph, and performing vector similarity searches.

Parameters:

Name Type Description Default
url PostgresDsn

PostgreSQL connection URL

required
pool_min_size int

Minimum number of connections in the pool

1
pool_max_size int

Maximum number of connections in the pool

10
Source code in pgraf/graph.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class PGraf:
    """Manage and Search the Property Graph.

    The PGraf class is the main entry point for interacting with the graph
    database. It provides methods for adding nodes and edges, querying,
    traversing the graph, and performing vector similarity searches.

    Args:
        url: PostgreSQL connection URL
        pool_min_size: Minimum number of connections in the pool
        pool_max_size: Maximum number of connections in the pool
    """

    def __init__(
        self,
        url: pydantic.PostgresDsn,
        pool_min_size: int = 1,
        pool_max_size: int = 10,
    ) -> None:
        """Initialize a new PGraf instance.

        Args:
            url: PostgreSQL connection URL
            pool_min_size: Minimum number of connections in the pool
            pool_max_size: Maximum number of connections in the pool
        """
        self._embeddings = embeddings.Embeddings()
        self._postgres = postgres.Postgres(url, pool_min_size, pool_max_size)

    async def initialize(self) -> None:
        """Ensure the database is connected and ready to go."""
        await self._postgres.initialize()

    async def aclose(self) -> None:
        """Close the Postgres connection pool."""
        await self._postgres.aclose()

    async def add_node(
        self,
        labels: list[str],
        properties: dict | None = None,
        created_at: datetime.datetime | None = None,
        modified_at: datetime.datetime | None = None,
        mimetype: str | None = None,
        content: str | None = None,
    ) -> models.Node:
        """Add a node to the graph"""
        value = models.Node(
            labels=labels,
            properties=properties or {},
            mimetype=mimetype,
            content=content,
        )
        if created_at is not None:
            value.created_at = created_at
        if modified_at is not None:
            value.modified_at = modified_at
        async with self._postgres.callproc(
            'pgraf.add_node', value, models.Node
        ) as cursor:
            result: models.Node = await cursor.fetchone()  # type: ignore
        if value.content is not None:
            await self._add_embeddings(value.id, value.content)
        return result

    async def delete_node(self, node_id: uuid.UUID) -> bool:
        """Retrieve a node by ID"""
        async with self._postgres.callproc(
            'pgraf.delete_node', {'id': node_id}
        ) as cursor:
            result: dict[str, int] = await cursor.fetchone()  # type: ignore
            return result['count'] == 1

    async def get_node(self, node_id: uuid.UUID | None) -> models.Node | None:
        """Retrieve a node by ID"""
        async with self._postgres.callproc(
            'pgraf.get_node', {'id': node_id}, models.Node
        ) as cursor:
            if cursor.rowcount == 1:
                return await cursor.fetchone()  # type: ignore
            return None

    async def get_node_labels(self) -> list[str]:
        """Retrieve all of the node types in the graph"""
        return await self._get_labels('nodes')

    async def get_node_properties(self) -> list[str]:
        """Retrieve the distincty property names across all nodes"""
        return await self._get_properties('nodes')

    async def get_nodes(
        self, labels: list[str] | None = None, properties: dict | None = None
    ) -> typing.AsyncGenerator[models.Node, None]:
        """Get all nodes matching the criteria"""
        statement, parameters = self._build_statement(
            queries.GET_NODES, labels, properties
        )
        async with self._postgres.execute(
            statement, parameters, models.Node
        ) as cursor:
            async for row in cursor:
                yield models.Node.model_validate(row)

    async def update_node(self, node: models.Node) -> models.Node:
        """Update a node"""
        async with self._postgres.callproc(
            'pgraf.update_node', node, models.Node
        ) as cursor:
            result: models.Node = await cursor.fetchone()  # type: ignore
        if result.content is not None:
            await self._add_embeddings(result.id, result.content)
        return result

    async def add_edge(
        self,
        source: uuid.UUID,
        target: uuid.UUID,
        labels: list[str] | None = None,
        properties: dict | None = None,
    ) -> models.Edge:
        """Add an edge, linking two nodes in the graph"""
        value = models.Edge(
            source=source,
            target=target,
            labels=labels or [],
            properties=properties or {},
        )
        async with self._postgres.callproc(
            'pgraf.add_edge', value, models.Edge
        ) as cursor:
            return await cursor.fetchone()  # type: ignore

    async def delete_edge(self, source: uuid.UUID, target: uuid.UUID) -> bool:
        """Remove an edge, severing the relationship between two nodes

        Note: This is a directional operation. It only removes the edge going
        from source to target, not from target to source.
        """
        async with self._postgres.callproc(
            'pgraf.delete_edge', {'source': source, 'target': target}
        ) as cursor:
            result: dict[str, int] = await cursor.fetchone()  # type: ignore
            return result['count'] == 1

    async def get_edge(
        self, source: uuid.UUID, target: uuid.UUID
    ) -> models.Edge | None:
        """Retrieve an edge from source to target

        Note: This is a directional operation. It only retrieves the edge going
        from source to target, not from target to source.
        """
        async with self._postgres.callproc(
            'pgraf.get_edge', {'source': source, 'target': target}, models.Edge
        ) as cursor:
            if cursor.rowcount == 0:
                return None
            return await cursor.fetchone()  # type: ignore

    async def get_edges(
        self, labels: list[str] | None = None, properties: dict | None = None
    ) -> typing.AsyncGenerator[models.Edge, None]:
        """Get edges by criteria"""
        statement, parameters = self._build_statement(
            queries.GET_EDGES, labels, properties
        )
        async with self._postgres.execute(
            statement, parameters, models.Edge
        ) as cursor:
            async for row in cursor:
                yield models.Edge.model_validate(row)

    async def get_edge_labels(self) -> list[str]:
        """Retrieve all of the edge labels in the graph"""
        return await self._get_labels('edges')

    async def get_edge_properties(self) -> list[str]:
        """Retrieve all of the edge property names in the graph"""
        return await self._get_properties('edges')

    async def update_edge(self, edge: models.Edge) -> models.Edge:
        """Update an edge"""
        async with self._postgres.callproc(
            'pgraf.update_edge', edge, models.Edge
        ) as cursor:
            return await cursor.fetchone()  # type: ignore

    async def search(
        self,
        query: str,
        labels: list[str] | None = None,
        properties: dict | None = None,
        similarity_threshold: float = 0.1,
        limit: int = 10,
        offset: int = 0,
    ) -> list[models.SearchResult]:
        """Search the content nodes in the graph, optionally filtering by
        properties, node types, and the edges labels.

        """
        vector = self._embeddings.get(query)
        if len(vector) > 1:
            LOGGER.warning(
                'Search text embeddings returned %i vector arrays', len(vector)
            )
        async with self._postgres.callproc(
            'pgraf.search',
            {
                'query': query,
                'labels': labels,
                'properties': json.Jsonb(properties) if properties else None,
                'embeddings': vector[0],
                'similarity': similarity_threshold,
                'limit': limit,
                'offset': offset,
            },
            models.SearchResult,
        ) as cursor:
            results: list[models.SearchResult] = await cursor.fetchall()  # type: ignore
            return results

    async def traverse(
        self,
        start_node: uuid.UUID,
        node_labels: list[str] | None = None,
        edge_labels: list[str] | None = None,
        direction: str = 'outgoing',
        max_depth: int = 5,
        limit: int = 25,
    ) -> list[tuple[models.Node, models.Edge | None]]:
        """Traverse the graph from a starting node"""
        results: list[tuple[models.Node, models.Edge | None]] = []
        visited_nodes = set()  # Track visited nodes to avoid duplicates

        # Recursive helper function to implement depth-first traversal
        async def traverse_recursive(node_id, current_depth=0, path_edge=None):
            # Check the limit
            if len(results) >= limit:
                return

            # Check max depth
            if current_depth > max_depth:
                return

            # Check if we've visited this node
            if node_id in visited_nodes:
                return

            # Mark this node as visited
            visited_nodes.add(node_id)

            # Get the current node
            current_node = await self.get_node(node_id)
            if not current_node:
                return

            # Apply node label filtering
            if node_labels and not any(
                label in current_node.labels for label in node_labels
            ):
                # Only filter at depth > 0 to ensure starting node is included
                if current_depth > 0:
                    return

            # Add this node to results
            results.append((current_node, path_edge))

            if current_depth >= max_depth:
                return

            # Build SQL query based on direction
            if direction == 'outgoing':
                query = sql.SQL(
                    'SELECT * FROM pgraf.edges WHERE source = %(node_id)s'
                )
            elif direction == 'incoming':
                query = sql.SQL(
                    'SELECT * FROM pgraf.edges WHERE target = %(node_id)s'
                )
            else:  # both
                query = sql.SQL(
                    """\
                    SELECT *
                      FROM pgraf.edges
                     WHERE source = %(node_id)s
                        OR target = %(node_id)s
                    """
                )

            # Get all edges connected to this node
            async with self._postgres.execute(
                query, {'node_id': node_id}
            ) as cursor:
                edges = await cursor.fetchall()

                # Process each edge
                for edge_row in edges:
                    # Skip edges that don't match filter criteria
                    if edge_labels and not any(
                        label in edge_row['labels'] for label in edge_labels
                    ):
                        continue

                    # Create the edge model
                    edge = models.Edge(
                        source=edge_row['source'],
                        target=edge_row['target'],
                        labels=edge_row['labels'],
                        properties=edge_row['properties'],
                    )

                    # Determine the next node ID based on direction
                    next_id = edge_row['target']
                    if direction == 'incoming' or (
                        direction == 'both' and edge_row['target'] == node_id
                    ):
                        next_id = edge_row['source']

                    # Skip if it's the current node
                    if next_id == node_id:
                        continue

                    # Recursively traverse
                    await traverse_recursive(next_id, current_depth + 1, edge)

                    # Check if limit reached
                    if len(results) >= limit:
                        return

        await traverse_recursive(start_node)
        LOGGER.debug(
            'Traverse results: %s items, visited %s nodes',
            len(results),
            len(visited_nodes),
        )
        return results

    @staticmethod
    def _build_statement(
        select: str,
        labels: list[str] | None = None,
        properties: dict | None = None,
    ) -> tuple[sql.Composable, dict[str, typing.Any]]:
        """Generate the SQL for get_edges and get_nodes"""
        parameters: dict[str, typing.Any] = {}
        statement: list[str | sql.Composable] = [
            sql.SQL(select) + sql.SQL(' ')  # type: ignore
        ]
        if not labels and not properties:
            return sql.Composed(statement), parameters
        where: list[sql.Composable] = []
        if labels:
            parameters['labels'] = labels
            where.append(
                sql.SQL('labels') + sql.SQL(' && ') + sql.Placeholder('labels')
            )
        if properties:
            props = []
            for key, value in properties.items():
                props.append(
                    sql.SQL(f"properties->>'{key}'")  # type: ignore
                    + sql.SQL(' = ')
                    + sql.Placeholder(f'props_{key}')
                )
                parameters[f'props_{key}'] = str(value)
            if len(props) > 1:
                where.append(
                    sql.SQL('(') + sql.SQL(' OR ').join(props) + sql.SQL(')')
                )
            else:
                where.append(props[0])
        if where:
            statement.append(sql.SQL('WHERE '))
            statement.append(sql.SQL(' AND ').join(where))
        return sql.Composed(statement), parameters

    async def _get_labels(self, table: str) -> list[str]:
        """Dynamically construct the query to get distinct labels"""
        query = sql.Composed(
            [
                sql.SQL('SELECT DISTINCT unnest(labels) AS label'),
                sql.SQL(' FROM '),
                sql.SQL('.').join(
                    [sql.Identifier('pgraf'), sql.Identifier(table)]
                ),
                sql.SQL(' WHERE labels IS NOT NULL '),
                sql.SQL(' ORDER BY label'),
            ]
        )
        async with self._postgres.execute(query) as cursor:
            return [row['label'] for row in await cursor.fetchall()]  # type: ignore

    async def _get_properties(self, table: str) -> list[str]:
        """Retrieve the distincty property names across all nodes"""
        query = sql.Composed(
            [
                sql.SQL(
                    'SELECT DISTINCT jsonb_object_keys(properties) AS key'
                ),
                sql.SQL(' FROM '),
                sql.SQL('.').join(
                    [sql.Identifier('pgraf'), sql.Identifier(table)]
                ),
                sql.SQL(' WHERE properties IS NOT NULL'),
                sql.SQL(' ORDER BY key'),
            ]
        )
        async with self._postgres.execute(query) as cursor:
            return [row['key'] for row in await cursor.fetchall()]  # type: ignore

    async def _add_embeddings(self, node_id: uuid.UUID, content: str) -> None:
        """Chunk the content and write the embeddings"""
        for offset, value in enumerate(self._embeddings.get(content)):
            async with self._postgres.callproc(
                'pgraf.add_embedding',
                {'node': node_id, 'chunk': offset, 'value': value},
            ) as cursor:
                result: dict[str, bool] = await cursor.fetchone()  # type: ignore
                if not result['success']:
                    raise errors.DatabaseError('Failed to insert embedding')

__init__(url, pool_min_size=1, pool_max_size=10)

Initialize a new PGraf instance.

Parameters:

Name Type Description Default
url PostgresDsn

PostgreSQL connection URL

required
pool_min_size int

Minimum number of connections in the pool

1
pool_max_size int

Maximum number of connections in the pool

10
Source code in pgraf/graph.py
def __init__(
    self,
    url: pydantic.PostgresDsn,
    pool_min_size: int = 1,
    pool_max_size: int = 10,
) -> None:
    """Initialize a new PGraf instance.

    Args:
        url: PostgreSQL connection URL
        pool_min_size: Minimum number of connections in the pool
        pool_max_size: Maximum number of connections in the pool
    """
    self._embeddings = embeddings.Embeddings()
    self._postgres = postgres.Postgres(url, pool_min_size, pool_max_size)

aclose() async

Close the Postgres connection pool.

Source code in pgraf/graph.py
async def aclose(self) -> None:
    """Close the Postgres connection pool."""
    await self._postgres.aclose()

add_edge(source, target, labels=None, properties=None) async

Add an edge, linking two nodes in the graph

Source code in pgraf/graph.py
async def add_edge(
    self,
    source: uuid.UUID,
    target: uuid.UUID,
    labels: list[str] | None = None,
    properties: dict | None = None,
) -> models.Edge:
    """Add an edge, linking two nodes in the graph"""
    value = models.Edge(
        source=source,
        target=target,
        labels=labels or [],
        properties=properties or {},
    )
    async with self._postgres.callproc(
        'pgraf.add_edge', value, models.Edge
    ) as cursor:
        return await cursor.fetchone()  # type: ignore

add_node(labels, properties=None, created_at=None, modified_at=None, mimetype=None, content=None) async

Add a node to the graph

Source code in pgraf/graph.py
async def add_node(
    self,
    labels: list[str],
    properties: dict | None = None,
    created_at: datetime.datetime | None = None,
    modified_at: datetime.datetime | None = None,
    mimetype: str | None = None,
    content: str | None = None,
) -> models.Node:
    """Add a node to the graph"""
    value = models.Node(
        labels=labels,
        properties=properties or {},
        mimetype=mimetype,
        content=content,
    )
    if created_at is not None:
        value.created_at = created_at
    if modified_at is not None:
        value.modified_at = modified_at
    async with self._postgres.callproc(
        'pgraf.add_node', value, models.Node
    ) as cursor:
        result: models.Node = await cursor.fetchone()  # type: ignore
    if value.content is not None:
        await self._add_embeddings(value.id, value.content)
    return result

delete_edge(source, target) async

Remove an edge, severing the relationship between two nodes

Note: This is a directional operation. It only removes the edge going from source to target, not from target to source.

Source code in pgraf/graph.py
async def delete_edge(self, source: uuid.UUID, target: uuid.UUID) -> bool:
    """Remove an edge, severing the relationship between two nodes

    Note: This is a directional operation. It only removes the edge going
    from source to target, not from target to source.
    """
    async with self._postgres.callproc(
        'pgraf.delete_edge', {'source': source, 'target': target}
    ) as cursor:
        result: dict[str, int] = await cursor.fetchone()  # type: ignore
        return result['count'] == 1

delete_node(node_id) async

Retrieve a node by ID

Source code in pgraf/graph.py
async def delete_node(self, node_id: uuid.UUID) -> bool:
    """Retrieve a node by ID"""
    async with self._postgres.callproc(
        'pgraf.delete_node', {'id': node_id}
    ) as cursor:
        result: dict[str, int] = await cursor.fetchone()  # type: ignore
        return result['count'] == 1

get_edge(source, target) async

Retrieve an edge from source to target

Note: This is a directional operation. It only retrieves the edge going from source to target, not from target to source.

Source code in pgraf/graph.py
async def get_edge(
    self, source: uuid.UUID, target: uuid.UUID
) -> models.Edge | None:
    """Retrieve an edge from source to target

    Note: This is a directional operation. It only retrieves the edge going
    from source to target, not from target to source.
    """
    async with self._postgres.callproc(
        'pgraf.get_edge', {'source': source, 'target': target}, models.Edge
    ) as cursor:
        if cursor.rowcount == 0:
            return None
        return await cursor.fetchone()  # type: ignore

get_edge_labels() async

Retrieve all of the edge labels in the graph

Source code in pgraf/graph.py
async def get_edge_labels(self) -> list[str]:
    """Retrieve all of the edge labels in the graph"""
    return await self._get_labels('edges')

get_edge_properties() async

Retrieve all of the edge property names in the graph

Source code in pgraf/graph.py
async def get_edge_properties(self) -> list[str]:
    """Retrieve all of the edge property names in the graph"""
    return await self._get_properties('edges')

get_edges(labels=None, properties=None) async

Get edges by criteria

Source code in pgraf/graph.py
async def get_edges(
    self, labels: list[str] | None = None, properties: dict | None = None
) -> typing.AsyncGenerator[models.Edge, None]:
    """Get edges by criteria"""
    statement, parameters = self._build_statement(
        queries.GET_EDGES, labels, properties
    )
    async with self._postgres.execute(
        statement, parameters, models.Edge
    ) as cursor:
        async for row in cursor:
            yield models.Edge.model_validate(row)

get_node(node_id) async

Retrieve a node by ID

Source code in pgraf/graph.py
async def get_node(self, node_id: uuid.UUID | None) -> models.Node | None:
    """Retrieve a node by ID"""
    async with self._postgres.callproc(
        'pgraf.get_node', {'id': node_id}, models.Node
    ) as cursor:
        if cursor.rowcount == 1:
            return await cursor.fetchone()  # type: ignore
        return None

get_node_labels() async

Retrieve all of the node types in the graph

Source code in pgraf/graph.py
async def get_node_labels(self) -> list[str]:
    """Retrieve all of the node types in the graph"""
    return await self._get_labels('nodes')

get_node_properties() async

Retrieve the distincty property names across all nodes

Source code in pgraf/graph.py
async def get_node_properties(self) -> list[str]:
    """Retrieve the distincty property names across all nodes"""
    return await self._get_properties('nodes')

get_nodes(labels=None, properties=None) async

Get all nodes matching the criteria

Source code in pgraf/graph.py
async def get_nodes(
    self, labels: list[str] | None = None, properties: dict | None = None
) -> typing.AsyncGenerator[models.Node, None]:
    """Get all nodes matching the criteria"""
    statement, parameters = self._build_statement(
        queries.GET_NODES, labels, properties
    )
    async with self._postgres.execute(
        statement, parameters, models.Node
    ) as cursor:
        async for row in cursor:
            yield models.Node.model_validate(row)

initialize() async

Ensure the database is connected and ready to go.

Source code in pgraf/graph.py
async def initialize(self) -> None:
    """Ensure the database is connected and ready to go."""
    await self._postgres.initialize()

search(query, labels=None, properties=None, similarity_threshold=0.1, limit=10, offset=0) async

Search the content nodes in the graph, optionally filtering by properties, node types, and the edges labels.

Source code in pgraf/graph.py
async def search(
    self,
    query: str,
    labels: list[str] | None = None,
    properties: dict | None = None,
    similarity_threshold: float = 0.1,
    limit: int = 10,
    offset: int = 0,
) -> list[models.SearchResult]:
    """Search the content nodes in the graph, optionally filtering by
    properties, node types, and the edges labels.

    """
    vector = self._embeddings.get(query)
    if len(vector) > 1:
        LOGGER.warning(
            'Search text embeddings returned %i vector arrays', len(vector)
        )
    async with self._postgres.callproc(
        'pgraf.search',
        {
            'query': query,
            'labels': labels,
            'properties': json.Jsonb(properties) if properties else None,
            'embeddings': vector[0],
            'similarity': similarity_threshold,
            'limit': limit,
            'offset': offset,
        },
        models.SearchResult,
    ) as cursor:
        results: list[models.SearchResult] = await cursor.fetchall()  # type: ignore
        return results

traverse(start_node, node_labels=None, edge_labels=None, direction='outgoing', max_depth=5, limit=25) async

Traverse the graph from a starting node

Source code in pgraf/graph.py
async def traverse(
    self,
    start_node: uuid.UUID,
    node_labels: list[str] | None = None,
    edge_labels: list[str] | None = None,
    direction: str = 'outgoing',
    max_depth: int = 5,
    limit: int = 25,
) -> list[tuple[models.Node, models.Edge | None]]:
    """Traverse the graph from a starting node"""
    results: list[tuple[models.Node, models.Edge | None]] = []
    visited_nodes = set()  # Track visited nodes to avoid duplicates

    # Recursive helper function to implement depth-first traversal
    async def traverse_recursive(node_id, current_depth=0, path_edge=None):
        # Check the limit
        if len(results) >= limit:
            return

        # Check max depth
        if current_depth > max_depth:
            return

        # Check if we've visited this node
        if node_id in visited_nodes:
            return

        # Mark this node as visited
        visited_nodes.add(node_id)

        # Get the current node
        current_node = await self.get_node(node_id)
        if not current_node:
            return

        # Apply node label filtering
        if node_labels and not any(
            label in current_node.labels for label in node_labels
        ):
            # Only filter at depth > 0 to ensure starting node is included
            if current_depth > 0:
                return

        # Add this node to results
        results.append((current_node, path_edge))

        if current_depth >= max_depth:
            return

        # Build SQL query based on direction
        if direction == 'outgoing':
            query = sql.SQL(
                'SELECT * FROM pgraf.edges WHERE source = %(node_id)s'
            )
        elif direction == 'incoming':
            query = sql.SQL(
                'SELECT * FROM pgraf.edges WHERE target = %(node_id)s'
            )
        else:  # both
            query = sql.SQL(
                """\
                SELECT *
                  FROM pgraf.edges
                 WHERE source = %(node_id)s
                    OR target = %(node_id)s
                """
            )

        # Get all edges connected to this node
        async with self._postgres.execute(
            query, {'node_id': node_id}
        ) as cursor:
            edges = await cursor.fetchall()

            # Process each edge
            for edge_row in edges:
                # Skip edges that don't match filter criteria
                if edge_labels and not any(
                    label in edge_row['labels'] for label in edge_labels
                ):
                    continue

                # Create the edge model
                edge = models.Edge(
                    source=edge_row['source'],
                    target=edge_row['target'],
                    labels=edge_row['labels'],
                    properties=edge_row['properties'],
                )

                # Determine the next node ID based on direction
                next_id = edge_row['target']
                if direction == 'incoming' or (
                    direction == 'both' and edge_row['target'] == node_id
                ):
                    next_id = edge_row['source']

                # Skip if it's the current node
                if next_id == node_id:
                    continue

                # Recursively traverse
                await traverse_recursive(next_id, current_depth + 1, edge)

                # Check if limit reached
                if len(results) >= limit:
                    return

    await traverse_recursive(start_node)
    LOGGER.debug(
        'Traverse results: %s items, visited %s nodes',
        len(results),
        len(visited_nodes),
    )
    return results

update_edge(edge) async

Update an edge

Source code in pgraf/graph.py
async def update_edge(self, edge: models.Edge) -> models.Edge:
    """Update an edge"""
    async with self._postgres.callproc(
        'pgraf.update_edge', edge, models.Edge
    ) as cursor:
        return await cursor.fetchone()  # type: ignore

update_node(node) async

Update a node

Source code in pgraf/graph.py
async def update_node(self, node: models.Node) -> models.Node:
    """Update a node"""
    async with self._postgres.callproc(
        'pgraf.update_node', node, models.Node
    ) as cursor:
        result: models.Node = await cursor.fetchone()  # type: ignore
    if result.content is not None:
        await self._add_embeddings(result.id, result.content)
    return result